You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/04/28 23:44:49 UTC

[ignite-3] 06/07: IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 081058c1201fdb35803e0adeb72677fc9a63d08a
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Fri Apr 23 02:35:28 2021 +0300

    IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)
---
 .../metastorage/server/KeyValueStorage.java        |  10 +
 .../server/SimpleInMemoryKeyValueStorage.java      | 186 +++++++++-
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 380 +++++++++++++++++++++
 3 files changed, 565 insertions(+), 11 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0f18ece..526e4fb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -31,11 +31,21 @@ public interface KeyValueStorage {
 
     void putAll(List<byte[]> keys, List<byte[]> values);
 
+    @NotNull
+    Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
+
     void remove(byte[] key);
 
     @NotNull
     Entry getAndRemove(byte[] key);
 
+    void removeAll(List<byte[]> key);
+
+    @NotNull
+    Collection<Entry> getAndRemoveAll(List<byte[]> keys);
+
+    Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+
     Iterator<Entry> iterate(byte[] key);
 
     //Iterator<Entry> iterate(long rev);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index f532005..32f720e 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,15 +1,9 @@
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
+import java.util.*;
+import java.util.function.Consumer;
+
+import org.apache.ignite.metastorage.common.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
@@ -25,6 +19,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
     private final Watcher watcher;
 
+    private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
+
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
@@ -66,8 +62,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     @Override
     public void putAll(List<byte[]> keys, List<byte[]> values) {
         synchronized (mux) {
+            long curRev = rev + 1;
+
+            doPutAll(curRev, keys, values);
+        }
+    }
+
+    @Override
+    public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+        Collection<Entry> res;
+
+        synchronized (mux) {
+            long curRev = rev + 1;
 
+            res = doGetAll(keys, curRev);
+
+            doPutAll(curRev, keys, values);
         }
+
+        return res;
     }
 
     @NotNull
@@ -119,6 +132,69 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    @Override
+    public void removeAll(List<byte[]> keys) {
+        synchronized (mux) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            List<byte[]> vals = new ArrayList<>(keys.size());
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                Entry e = doGet(key, LATEST_REV, false);
+
+                if (e.empty() || e.tombstone())
+                    continue;
+
+                existingKeys.add(key);
+
+                vals.add(TOMBSTONE);
+            }
+
+            doPutAll(curRev, existingKeys, vals);
+        }
+    }
+
+    @Override
+    public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+        Collection<Entry> res = new ArrayList<>(keys.size());
+
+        synchronized (mux) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            List<byte[]> vals = new ArrayList<>(keys.size());
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                Entry e = doGet(key, LATEST_REV, false);
+
+                res.add(e);
+
+                if (e.empty() || e.tombstone())
+                    continue;
+
+                existingKeys.add(key);
+
+                vals.add(TOMBSTONE);
+            }
+
+            doPutAll(curRev, existingKeys, vals);
+        }
+
+        return res;
+    }
+
+    @Override
+    public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
+        return null;
+    }
+
     @Override public Iterator<Entry> iterate(byte[] keyFrom) {
         synchronized (mux) {
             NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
@@ -237,7 +313,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private Collection<Entry> doGetAll(List<byte[]> keys, long rev) {
         assert keys != null : "keys list can't be null.";
         assert !keys.isEmpty() : "keys list can't be empty.";
-        assert rev > 0 : "Revision must be positive.";
+        assert rev > 0 || rev == LATEST_REV: "Revision must be positive.";
 
         Collection<Entry> res = new ArrayList<>(keys.size());
 
@@ -344,6 +420,39 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return lastRev;
     }
 
+    private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
+        synchronized (mux) {
+            // Update revsIdx.
+            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                byte[] bytes = bytesList.get(i);
+
+                long curUpdCntr = ++updCntr;
+
+                // Update keysIdx.
+                List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+                long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+                revs.add(curRev);
+
+                Value val = new Value(bytes, curUpdCntr);
+
+                entries.put(key, val);
+
+                revsIdx.put(curRev, entries);
+            }
+
+            rev = curRev;
+
+            return curRev;
+        }
+    }
+
+
     private static boolean isPrefix(byte[] pref, byte[] term) {
         if (pref.length > term.length)
             return false;
@@ -368,4 +477,59 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return res;
     }
 
+    private class RangeCursor implements Cursor<Entry> {
+        private final byte[] keyFrom;
+        private final byte[] keyTo;
+        private final long rev;
+        private byte[] curKey;
+
+        public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            this.rev = rev;
+        }
+
+        @Override public void close() throws Exception {
+
+        }
+
+        @NotNull
+        @Override public Iterator<Entry> iterator() {
+            return new Iterator<Entry>() {
+                @Override public boolean hasNext() {
+                    synchronized (mux) {
+                        byte[] key = keysIdx.ceilingKey(curKey);
+
+                        return key != null;
+                    }
+                }
+
+                @Override public Entry next() {
+                    synchronized (mux) {
+                        Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
+
+                        if (e == null)
+                            throw new NoSuchElementException();
+
+                        List<Long> revs = e.getValue();
+
+                        assert revs != null && !revs.isEmpty() :
+                                "Revisions should not be empty: [revs=" + revs + ']';
+
+                        //lastRevision(re)
+
+                        return null;
+                    }
+                }
+            };
+        }
+
+        @Override public void forEach(Consumer<? super Entry> action) {
+            Cursor.super.forEach(action);
+        }
+
+        @Override public Spliterator<Entry> spliterator() {
+            return Cursor.super.spliterator();
+        }
+    }
 }
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index fa130e6..4a73137 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -272,6 +272,191 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    public void putAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Must be rewritten.
+        storage.put(key2, val2_1);
+
+        // Remove. Tombstone must be replaced by new value.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+        assertEquals(4, storage.revision());
+        assertEquals(6, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(4, e1.revision());
+        assertEquals(4, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(4, e2.revision());
+        assertEquals(5, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(4, e3.revision());
+        assertEquals(6, e3.updateCounter());
+        assertFalse(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
+    public void getAndPutAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Must be rewritten.
+        storage.put(key2, val2_1);
+
+        // Remove. Tombstone must be replaced by new value.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+        assertEquals(4, storage.revision());
+        assertEquals(6, storage.updateCounter());
+
+        assertEquals(3, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(0, e1.revision());
+        assertEquals(0, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertTrue(e1.empty());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(1, e2.revision());
+        assertEquals(1, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_1, e2.value());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(3, e3.revision());
+        assertEquals(3, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test state after putAll.
+        entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(4, e1.revision());
+        assertEquals(4, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(4, e2.revision());
+        assertEquals(5, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test removed value.
+        e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(4, e3.revision());
+        assertEquals(6, e3.updateCounter());
+        assertFalse(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
     public void remove() {
         byte[] key = k(1);
         byte[] val = kv(1, 1);
@@ -377,6 +562,201 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    public void removeAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove. Tombstone must not be removed again.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        storage.removeAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(6, storage.revision());
+        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(6, e1.revision());
+        assertEquals(6, e1.updateCounter());
+        assertTrue(e1.tombstone());
+        assertFalse(e1.empty());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(6, e2.revision());
+        assertEquals(7, e2.updateCounter());
+        assertTrue(e2.tombstone());
+        assertFalse(e2.empty());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
+    public void getAndRemoveAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove. Tombstone must not be removed again.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(6, storage.revision());
+        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+
+        // Test state after getAndRemoveAll.
+        entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(6, e1.revision());
+        assertEquals(6, e1.updateCounter());
+        assertTrue(e1.tombstone());
+        assertFalse(e1.empty());
+
+        // Test rewritten value.
+        e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(6, e2.revision());
+        assertEquals(7, e2.updateCounter());
+        assertTrue(e2.tombstone());
+        assertFalse(e2.empty());
+
+        // Test removed value.
+        e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
     public void getAfterRemove() {
         byte[] key = k(1);
         byte[] val = kv(1, 1);