You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/04 15:37:39 UTC
[ignite-3] 06/08: IGNITE-14389 Added putAll and removeAll. Started
cursor management: ranges and watches (WIP)
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ee2c7adf51ff11148e9c359893217c86a866a9cc
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Fri Apr 23 02:35:28 2021 +0300
IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)
---
.../metastorage/server/KeyValueStorage.java | 10 +
.../server/SimpleInMemoryKeyValueStorage.java | 186 +++++++++-
.../server/SimpleInMemoryKeyValueStorageTest.java | 380 +++++++++++++++++++++
3 files changed, 565 insertions(+), 11 deletions(-)
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0f18ece..526e4fb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -31,11 +31,21 @@ public interface KeyValueStorage {
void putAll(List<byte[]> keys, List<byte[]> values);
+ @NotNull
+ Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
+
void remove(byte[] key);
@NotNull
Entry getAndRemove(byte[] key);
+ void removeAll(List<byte[]> key);
+
+ @NotNull
+ Collection<Entry> getAndRemoveAll(List<byte[]> keys);
+
+ Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+
Iterator<Entry> iterate(byte[] key);
//Iterator<Entry> iterate(long rev);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index f532005..32f720e 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,15 +1,9 @@
package org.apache.ignite.internal.metastorage.server;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
+import java.util.*;
+import java.util.function.Consumer;
+
+import org.apache.ignite.metastorage.common.Cursor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
@@ -25,6 +19,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
private final Watcher watcher;
+ private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
+
private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
@@ -66,8 +62,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
@Override
public void putAll(List<byte[]> keys, List<byte[]> values) {
synchronized (mux) {
+ long curRev = rev + 1;
+
+ doPutAll(curRev, keys, values);
+ }
+ }
+
+ @Override
+ public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+ Collection<Entry> res;
+
+ synchronized (mux) {
+ long curRev = rev + 1;
+ res = doGetAll(keys, curRev);
+
+ doPutAll(curRev, keys, values);
}
+
+ return res;
}
@NotNull
@@ -119,6 +132,69 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ @Override
+ public void removeAll(List<byte[]> keys) {
+ synchronized (mux) {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ List<byte[]> vals = new ArrayList<>(keys.size());
+
+ for (int i = 0; i < keys.size(); i++) {
+ byte[] key = keys.get(i);
+
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ continue;
+
+ existingKeys.add(key);
+
+ vals.add(TOMBSTONE);
+ }
+
+ doPutAll(curRev, existingKeys, vals);
+ }
+ }
+
+ @Override
+ public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+ Collection<Entry> res = new ArrayList<>(keys.size());
+
+ synchronized (mux) {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ List<byte[]> vals = new ArrayList<>(keys.size());
+
+ for (int i = 0; i < keys.size(); i++) {
+ byte[] key = keys.get(i);
+
+ Entry e = doGet(key, LATEST_REV, false);
+
+ res.add(e);
+
+ if (e.empty() || e.tombstone())
+ continue;
+
+ existingKeys.add(key);
+
+ vals.add(TOMBSTONE);
+ }
+
+ doPutAll(curRev, existingKeys, vals);
+ }
+
+ return res;
+ }
+
+ @Override
+ public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
+ return null;
+ }
+
@Override public Iterator<Entry> iterate(byte[] keyFrom) {
synchronized (mux) {
NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
@@ -237,7 +313,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
private Collection<Entry> doGetAll(List<byte[]> keys, long rev) {
assert keys != null : "keys list can't be null.";
assert !keys.isEmpty() : "keys list can't be empty.";
- assert rev > 0 : "Revision must be positive.";
+ assert rev > 0 || rev == LATEST_REV: "Revision must be positive.";
Collection<Entry> res = new ArrayList<>(keys.size());
@@ -344,6 +420,39 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return lastRev;
}
+ private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
+ synchronized (mux) {
+ // Update revsIdx.
+ NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+ for (int i = 0; i < keys.size(); i++) {
+ byte[] key = keys.get(i);
+
+ byte[] bytes = bytesList.get(i);
+
+ long curUpdCntr = ++updCntr;
+
+ // Update keysIdx.
+ List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+ long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+ revs.add(curRev);
+
+ Value val = new Value(bytes, curUpdCntr);
+
+ entries.put(key, val);
+
+ revsIdx.put(curRev, entries);
+ }
+
+ rev = curRev;
+
+ return curRev;
+ }
+ }
+
+
private static boolean isPrefix(byte[] pref, byte[] term) {
if (pref.length > term.length)
return false;
@@ -368,4 +477,59 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
+ private class RangeCursor implements Cursor<Entry> {
+ private final byte[] keyFrom;
+ private final byte[] keyTo;
+ private final long rev;
+ private byte[] curKey;
+
+ public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+ this.keyFrom = keyFrom;
+ this.keyTo = keyTo;
+ this.rev = rev;
+ }
+
+ @Override public void close() throws Exception {
+
+ }
+
+ @NotNull
+ @Override public Iterator<Entry> iterator() {
+ return new Iterator<Entry>() {
+ @Override public boolean hasNext() {
+ synchronized (mux) {
+ byte[] key = keysIdx.ceilingKey(curKey);
+
+ return key != null;
+ }
+ }
+
+ @Override public Entry next() {
+ synchronized (mux) {
+ Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
+
+ if (e == null)
+ throw new NoSuchElementException();
+
+ List<Long> revs = e.getValue();
+
+ assert revs != null && !revs.isEmpty() :
+ "Revisions should not be empty: [revs=" + revs + ']';
+
+ //lastRevision(re)
+
+ return null;
+ }
+ }
+ };
+ }
+
+ @Override public void forEach(Consumer<? super Entry> action) {
+ Cursor.super.forEach(action);
+ }
+
+ @Override public Spliterator<Entry> spliterator() {
+ return Cursor.super.spliterator();
+ }
+ }
}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index fa130e6..4a73137 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -272,6 +272,191 @@ class SimpleInMemoryKeyValueStorageTest {
}
@Test
+ public void putAll() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
+ byte[] val3_2 = kv(3, 32);
+
+ byte[] key4 = k(4);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ // Must be rewritten.
+ storage.put(key2, val2_1);
+
+ // Remove. Tombstone must be replaced by new value.
+ storage.put(key3, val3_1);
+ storage.remove(key3);
+
+ assertEquals(3, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+ assertEquals(4, storage.revision());
+ assertEquals(6, storage.updateCounter());
+
+ Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(4, entries.size());
+
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ Entry e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(4, e1.revision());
+ assertEquals(4, e1.updateCounter());
+ assertFalse(e1.tombstone());
+ assertFalse(e1.empty());
+ assertArrayEquals(val1, e1.value());
+
+ // Test rewritten value.
+ Entry e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(4, e2.revision());
+ assertEquals(5, e2.updateCounter());
+ assertFalse(e2.tombstone());
+ assertFalse(e2.empty());
+ assertArrayEquals(val2_2, e2.value());
+
+ // Test removed value.
+ Entry e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(4, e3.revision());
+ assertEquals(6, e3.updateCounter());
+ assertFalse(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test empty value.
+ Entry e4 = map.get(new Key(key4));
+
+ assertNotNull(e4);
+ assertFalse(e4.tombstone());
+ assertTrue(e4.empty());
+ }
+
+ @Test
+ public void getAndPutAll() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
+ byte[] val3_2 = kv(3, 32);
+
+ byte[] key4 = k(4);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ // Must be rewritten.
+ storage.put(key2, val2_1);
+
+ // Remove. Tombstone must be replaced by new value.
+ storage.put(key3, val3_1);
+ storage.remove(key3);
+
+ assertEquals(3, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+ assertEquals(4, storage.revision());
+ assertEquals(6, storage.updateCounter());
+
+ assertEquals(3, entries.size());
+
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ Entry e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(0, e1.revision());
+ assertEquals(0, e1.updateCounter());
+ assertFalse(e1.tombstone());
+ assertTrue(e1.empty());
+
+ // Test rewritten value.
+ Entry e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(1, e2.revision());
+ assertEquals(1, e2.updateCounter());
+ assertFalse(e2.tombstone());
+ assertFalse(e2.empty());
+ assertArrayEquals(val2_1, e2.value());
+
+ // Test removed value.
+ Entry e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(3, e3.revision());
+ assertEquals(3, e3.updateCounter());
+ assertTrue(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test state after putAll.
+ entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(4, entries.size());
+
+ map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(4, e1.revision());
+ assertEquals(4, e1.updateCounter());
+ assertFalse(e1.tombstone());
+ assertFalse(e1.empty());
+ assertArrayEquals(val1, e1.value());
+
+ // Test rewritten value.
+ e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(4, e2.revision());
+ assertEquals(5, e2.updateCounter());
+ assertFalse(e2.tombstone());
+ assertFalse(e2.empty());
+ assertArrayEquals(val2_2, e2.value());
+
+ // Test removed value.
+ e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(4, e3.revision());
+ assertEquals(6, e3.updateCounter());
+ assertFalse(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test empty value.
+ Entry e4 = map.get(new Key(key4));
+
+ assertNotNull(e4);
+ assertFalse(e4.tombstone());
+ assertTrue(e4.empty());
+ }
+
+ @Test
public void remove() {
byte[] key = k(1);
byte[] val = kv(1, 1);
@@ -377,6 +562,201 @@ class SimpleInMemoryKeyValueStorageTest {
}
@Test
+ public void removeAll() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
+
+ byte[] key4 = k(4);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ // Regular put.
+ storage.put(key1, val1);
+
+ // Rewrite.
+ storage.put(key2, val2_1);
+ storage.put(key2, val2_2);
+
+ // Remove. Tombstone must not be removed again.
+ storage.put(key3, val3_1);
+ storage.remove(key3);
+
+ assertEquals(5, storage.revision());
+ assertEquals(5, storage.updateCounter());
+
+ storage.removeAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(6, storage.revision());
+ assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+ Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(4, entries.size());
+
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ Entry e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(6, e1.revision());
+ assertEquals(6, e1.updateCounter());
+ assertTrue(e1.tombstone());
+ assertFalse(e1.empty());
+
+ // Test rewritten value.
+ Entry e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(6, e2.revision());
+ assertEquals(7, e2.updateCounter());
+ assertTrue(e2.tombstone());
+ assertFalse(e2.empty());
+
+ // Test removed value.
+ Entry e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(5, e3.revision());
+ assertEquals(5, e3.updateCounter());
+ assertTrue(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test empty value.
+ Entry e4 = map.get(new Key(key4));
+
+ assertNotNull(e4);
+ assertFalse(e4.tombstone());
+ assertTrue(e4.empty());
+ }
+
+ @Test
+ public void getAndRemoveAll() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
+
+ byte[] key2 = k(2);
+ byte[] val2_1 = kv(2, 21);
+ byte[] val2_2 = kv(2, 22);
+
+ byte[] key3 = k(3);
+ byte[] val3_1 = kv(3, 31);
+
+ byte[] key4 = k(4);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ // Regular put.
+ storage.put(key1, val1);
+
+ // Rewrite.
+ storage.put(key2, val2_1);
+ storage.put(key2, val2_2);
+
+ // Remove. Tombstone must not be removed again.
+ storage.put(key3, val3_1);
+ storage.remove(key3);
+
+ assertEquals(5, storage.revision());
+ assertEquals(5, storage.updateCounter());
+
+ Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(6, storage.revision());
+ assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+ assertEquals(4, entries.size());
+
+ Map<Key, Entry> map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ Entry e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(1, e1.revision());
+ assertEquals(1, e1.updateCounter());
+ assertFalse(e1.tombstone());
+ assertFalse(e1.empty());
+
+
+ // Test rewritten value.
+ Entry e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(3, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertFalse(e2.tombstone());
+ assertFalse(e2.empty());
+
+
+ // Test removed value.
+ Entry e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(5, e3.revision());
+ assertEquals(5, e3.updateCounter());
+ assertTrue(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test empty value.
+ Entry e4 = map.get(new Key(key4));
+
+ assertNotNull(e4);
+ assertFalse(e4.tombstone());
+ assertTrue(e4.empty());
+
+ // Test state after getAndRemoveAll.
+ entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+ assertEquals(4, entries.size());
+
+ map = entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+ // Test regular put value.
+ e1 = map.get(new Key(key1));
+
+ assertNotNull(e1);
+ assertEquals(6, e1.revision());
+ assertEquals(6, e1.updateCounter());
+ assertTrue(e1.tombstone());
+ assertFalse(e1.empty());
+
+ // Test rewritten value.
+ e2 = map.get(new Key(key2));
+
+ assertNotNull(e2);
+ assertEquals(6, e2.revision());
+ assertEquals(7, e2.updateCounter());
+ assertTrue(e2.tombstone());
+ assertFalse(e2.empty());
+
+ // Test removed value.
+ e3 = map.get(new Key(key3));
+
+ assertNotNull(e3);
+ assertEquals(5, e3.revision());
+ assertEquals(5, e3.updateCounter());
+ assertTrue(e3.tombstone());
+ assertFalse(e3.empty());
+
+ // Test empty value.
+ e4 = map.get(new Key(key4));
+
+ assertNotNull(e4);
+ assertFalse(e4.tombstone());
+ assertTrue(e4.empty());
+ }
+
+ @Test
public void getAfterRemove() {
byte[] key = k(1);
byte[] val = kv(1, 1);