You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/04 15:37:33 UTC

[ignite-3] branch ignite-14389 updated (bab8272 -> 73176e2)

This is an automated email from the ASF dual-hosted git repository.

agura pushed a change to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


 discard bab8272  IGNITE-14389 Implemented cursor for ranges and watches.
 discard 081058c  IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)
 discard de8eb56  IGNITE-14389 putAll initial (WIP)
 discard 9b8366d  IGNITE-14389 getAll and tests (WIP)
 discard fbc4e57  IGNITE-14389 Added get and do smth semantic
 discard 097fa8f  IGNITE-14398: Meta storage: added update counter
 discard 8200c1a  IGNITE-14389 Meta storage: in-memory implementation WIP
     add 38e7683  IGNITE-14446 Added support of watch and put/get/scan operations to MetaStorageManager. Fixes #111
     add 4d5b16e  IGNITE-14672 Added SQL related schemas for configuration. Fixes #117
     new cd44be6  IGNITE-14389 Meta storage: in-memory implementation WIP
     new a58026e  IGNITE-14398: Meta storage: added update counter
     new 52271be  IGNITE-14389 Added get and do smth semantic
     new c6932ea  IGNITE-14389 getAll and tests (WIP)
     new 9bd4087  IGNITE-14389 putAll initial (WIP)
     new ee2c7ad  IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)
     new c465eff  IGNITE-14389 Implemented cursor for ranges and watches.
     new 73176e2  IGNITE-14389 Implemented conditional update (invoke)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (bab8272)
            \
             N -- N -- N   refs/heads/ignite-14389 (73176e2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ignite/internal/affinity/AffinityManager.java  |  12 +-
 ...nSchema.java => ColumnConfigurationSchema.java} |  30 +-
 .../table/ColumnTypeConfigurationSchema.java}      |  24 +-
 .../table/IndexColumnConfigurationSchema.java}     |  20 +-
 .../schemas/table/TableConfigurationSchema.java    |   9 +
 ...ema.java => TableIndexConfigurationSchema.java} |  40 ++-
 .../metastorage/client/MetaStorageService.java     |  27 +-
 .../ignite/metastorage/common/Condition.java       | 133 +++-----
 .../ignite/metastorage/common/Conditions.java      |  10 +-
 .../ignite/metastorage/common/Operation.java       |  35 ++-
 .../ignite/metastorage/common/Operations.java      |  10 +-
 modules/metastorage-server/pom.xml                 |   6 +-
 .../metastorage/server/AbstractCondition.java      |  13 +
 .../internal/metastorage/server/Condition.java     |   7 +
 .../metastorage/server/KeyValueStorage.java        |   2 +
 .../internal/metastorage/server/Operation.java     |  31 ++
 .../metastorage/server/RevisionCondition.java      |  75 +++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  95 +++++-
 .../metastorage/server/ValueCondition.java         |  49 +++
 .../metastorage/server/RevisionConditionTest.java  |  65 ++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 323 ++++++++++++++++++++
 .../metastorage/server/ValueConditionTest.java     |  27 ++
 modules/metastorage/pom.xml                        |  23 ++
 .../internal/metastorage/MetaStorageManager.java   | 338 ++++++++++++++++++---
 .../metastorage/watch/AggregatedWatch.java         |  69 +++++
 .../internal/metastorage/watch/KeyCriterion.java   | 162 ++++++++++
 .../metastorage/watch/WatchAggregator.java         | 244 +++++++++++++++
 .../internal/metastorage/WatchAggregatorTest.java  | 131 ++++++++
 .../apache/ignite/internal/app/IgnitionImpl.java   | 121 +++++++-
 .../internal/table/distributed/TableManager.java   |  12 +-
 30 files changed, 1903 insertions(+), 240 deletions(-)
 copy modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/{TableConfigurationSchema.java => ColumnConfigurationSchema.java} (72%)
 copy modules/{configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/DiscoveryConfigurationSchema.java => api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnTypeConfigurationSchema.java} (73%)
 copy modules/{configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/sample/NodeConfigurationSchema.java => api/src/main/java/org/apache/ignite/configuration/schemas/table/IndexColumnConfigurationSchema.java} (75%)
 copy modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/{TableConfigurationSchema.java => TableIndexConfigurationSchema.java} (60%)
 create mode 100644 modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java
 create mode 100644 modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
 create mode 100644 modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
 create mode 100644 modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java
 create mode 100644 modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
 create mode 100644 modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java
 create mode 100644 modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java
 create mode 100644 modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/AggregatedWatch.java
 create mode 100644 modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/KeyCriterion.java
 create mode 100644 modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/watch/WatchAggregator.java
 create mode 100644 modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java

[ignite-3] 07/08: IGNITE-14389 Implemented cursor for ranges and watches.

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c465eff81ec2ff4ba6d084dc48d2d5b15b72a2d9
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Thu Apr 29 02:44:00 2021 +0300

    IGNITE-14389 Implemented cursor for ranges and watches.
---
 .../ignite/internal/metastorage/server/Entry.java  |  17 +
 .../internal/metastorage/server/EntryEvent.java    |  58 +++
 .../metastorage/server/KeyValueStorage.java        |  33 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 333 +++++++++------
 .../ignite/internal/metastorage/server/Value.java  |  17 +
 .../ignite/internal/metastorage/server/Watch.java  |  45 ---
 .../internal/metastorage/server/WatchEvent.java    |  54 +++
 .../internal/metastorage/server/Watcher.java       |  13 -
 .../internal/metastorage/server/WatcherImpl.java   |  58 ---
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 446 ++++++++++++++++++---
 10 files changed, 764 insertions(+), 310 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 263a88b..87b5471 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
new file mode 100644
index 0000000..554a3a7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/**
+ * Represent an change event for particular key and entry.
+ */
+public class EntryEvent {
+    /** Old (previous) entry. */
+    private final Entry oldEntry;
+
+    /** New (current) entry. */
+    private final Entry entry;
+
+    /**
+     * Constructs event with given old and new entries.
+     *
+     * @param oldEntry Old entry.
+     * @param curEntry New entry.
+     */
+    EntryEvent(Entry oldEntry, Entry curEntry) {
+        this.oldEntry = oldEntry;
+        this.entry = curEntry;
+    }
+
+    /**
+     * Returns old entry.
+     *
+     * @return Old entry.
+     */
+    public Entry oldEntry() {
+        return oldEntry;
+    }
+
+    /**
+     * Rreturns new entry.
+     *
+     * @return New entry.
+     */
+    public Entry entry() {
+        return entry;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 526e4fb..5d6da44 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -1,13 +1,28 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.metastorage.server;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.jetbrains.annotations.NotNull;
 
 public interface KeyValueStorage {
-
     long revision();
 
     long updateCounter();
@@ -44,11 +59,15 @@ public interface KeyValueStorage {
     @NotNull
     Collection<Entry> getAndRemoveAll(List<byte[]> keys);
 
-    Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+    Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
+
+    Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
+
+    Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
 
-    Iterator<Entry> iterate(byte[] key);
+    Cursor<WatchEvent> watch(byte[] key, long rev);
 
-    //Iterator<Entry> iterate(long rev);
+    Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
 
     void compact();
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 32f720e..b37c96a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,8 +1,34 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.*;
-import java.util.function.Consumer;
+package org.apache.ignite.internal.metastorage.server;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Predicate;
 import org.apache.ignite.metastorage.common.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
@@ -13,15 +39,11 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
  * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
  */
 public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
-    private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+    private static final Comparator<byte[]> CMP = Arrays::compare;
 
     private static final long LATEST_REV = -1;
 
-    private final Watcher watcher;
-
-    private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
-
-    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
 
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
 
@@ -31,10 +53,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
     private final Object mux = new Object();
 
-    public SimpleInMemoryKeyValueStorage(Watcher watcher) {
-        this.watcher = watcher;
-    }
-
     @Override public long revision() {
         return rev;
     }
@@ -141,9 +159,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             List<byte[]> vals = new ArrayList<>(keys.size());
 
-            for (int i = 0; i < keys.size(); i++) {
-                byte[] key = keys.get(i);
-
+            for (byte[] key : keys) {
                 Entry e = doGet(key, LATEST_REV, false);
 
                 if (e.empty() || e.tombstone())
@@ -169,9 +185,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             List<byte[]> vals = new ArrayList<>(keys.size());
 
-            for (int i = 0; i < keys.size(); i++) {
-                byte[] key = keys.get(i);
-
+            for (byte[] key : keys) {
                 Entry e = doGet(key, LATEST_REV, false);
 
                 res.add(e);
@@ -190,90 +204,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return res;
     }
 
-    @Override
-    public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
-        return null;
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+        return new RangeCursor(keyFrom, keyTo, rev);
     }
 
-    @Override public Iterator<Entry> iterate(byte[] keyFrom) {
-        synchronized (mux) {
-            NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
-
-            final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
-
-            return new Iterator<>() {
-                private Map.Entry<byte[], List<Long>> curr;
-                private boolean hasNext;
-
-                private void advance() {
-                    if (it.hasNext()) {
-                        Map.Entry<byte[], List<Long>> e = it.next();
-
-                        byte[] key = e.getKey();
-
-                        if (!isPrefix(keyFrom, key))
-                            hasNext = false;
-                        else {
-                            curr = e;
-
-                            hasNext = true;
-                        }
-                    } else
-                        hasNext = false;
-                }
-
-                @Override
-                public boolean hasNext() {
-                    synchronized (mux) {
-                        if (curr == null)
-                            advance();
-
-                        return hasNext;
-                    }
-                }
-
-                @Override
-                public Entry next() {
-                    synchronized (mux) {
-                        if (!hasNext())
-                            throw new NoSuchElementException();
-
-                        Map.Entry<byte[], List<Long>> e = curr;
-
-                        curr = null;
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+        return new RangeCursor(keyFrom, keyTo, revUpperBound);
+    }
 
-                        byte[] key = e.getKey();
+    @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+        assert keyFrom != null;
+        assert rev > 0;
 
-                        List<Long> revs = e.getValue();
+        return new WatchCursor(rev, k ->
+            CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
+        );
+    }
 
-                        long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+    @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+        assert key != null;
+        assert rev > 0;
 
-                        if (rev == 0) {
-                            throw new IllegalStateException("rev == 0");
-                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
-                        }
+        return new WatchCursor(rev, k -> CMP.compare(k, key) == 0);
+    }
 
-                        NavigableMap<byte[], Value> vals = revsIdx.get(rev);
+    @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
+        assert keys != null && !keys.isEmpty();
+        assert rev > 0;
 
-                        if (vals == null || vals.isEmpty()) {
-                            throw new IllegalStateException("vals == null || vals.isEmpty()");
-                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
-                        }
+        TreeSet<byte[]> keySet = new TreeSet<>(CMP);
 
-                        Value val = vals.get(key);
+        keySet.addAll(keys);
 
-                        return val.tombstone() ?
-                                Entry.tombstone(key, rev, val.updateCounter()) :
-                                new Entry(key, val.bytes(), rev, val.updateCounter());
-                    }
-                }
-            };
-        }
+        return new WatchCursor(rev, keySet::contains);
     }
 
+
     @Override public void compact() {
         synchronized (mux) {
-            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
 
             NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
 
@@ -302,7 +271,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
                     lastRev,
-                    k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+                    k -> new TreeMap<>(CMP)
             );
 
             compactedKv.put(key, lastVal);
@@ -409,7 +378,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         revs.add(curRev);
 
         // Update revsIdx.
-        NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+        NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
 
         Value val = new Value(bytes, curUpdCntr);
 
@@ -423,7 +392,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
         synchronized (mux) {
             // Update revsIdx.
-            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
 
             for (int i = 0; i < keys.size(); i++) {
                 byte[] key = keys.get(i);
@@ -452,19 +421,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
-
-    private static boolean isPrefix(byte[] pref, byte[] term) {
-        if (pref.length > term.length)
-            return false;
-
-        for (int i = 0; i < pref.length - 1; i++) {
-            if (pref[i] != term[i])
-                return false;
-        }
-
-        return true;
-    }
-
     private static long lastRevision(List<Long> revs) {
         return revs.get(revs.size() - 1);
     }
@@ -481,55 +437,184 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         private final byte[] keyFrom;
         private final byte[] keyTo;
         private final long rev;
-        private byte[] curKey;
+        private Entry nextRetEntry;
+        private byte[] lastRetKey;
+        private boolean finished;
 
-        public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+        RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.rev = rev;
         }
 
         @Override public void close() throws Exception {
-
+            // TODO: implement.
         }
 
         @NotNull
         @Override public Iterator<Entry> iterator() {
-            return new Iterator<Entry>() {
+            return new Iterator<>() {
                 @Override public boolean hasNext() {
                     synchronized (mux) {
-                        byte[] key = keysIdx.ceilingKey(curKey);
+                        while (true) {
+                            if (finished)
+                                return false;
+
+                            if (nextRetEntry != null)
+                                return true;
 
-                        return key != null;
+                            byte[] key = lastRetKey;
+
+                            while (!finished || nextRetEntry == null) {
+                                Map.Entry<byte[], List<Long>> e =
+                                        key == null ? keysIdx.ceilingEntry(keyFrom) : keysIdx.higherEntry(key);
+
+                                if (e == null) {
+                                    finished = true;
+
+                                    break;
+                                }
+
+                                key = e.getKey();
+
+                                if (keyTo != null && CMP.compare(key, keyTo) >= 0) {
+                                    finished = true;
+
+                                    break;
+                                }
+
+                                List<Long> revs = e.getValue();
+
+                                assert revs != null && !revs.isEmpty() :
+                                        "Revisions should not be empty: [revs=" + revs + ']';
+
+                                long lastRev = maxRevision(revs, rev);
+
+                                if (lastRev == -1)
+                                    continue;
+
+                                Entry entry = doGetValue(key, lastRev);
+
+                                assert !entry.empty() : "Iterator should not return empty entry.";
+
+                                nextRetEntry = entry;
+
+                                break;
+                            }
+                        }
                     }
                 }
 
                 @Override public Entry next() {
                     synchronized (mux) {
-                        Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
-
-                        if (e == null)
-                            throw new NoSuchElementException();
+                        while (true) {
+                            if (finished)
+                                throw new NoSuchElementException();
 
-                        List<Long> revs = e.getValue();
+                            if (nextRetEntry != null) {
+                                Entry e = nextRetEntry;
 
-                        assert revs != null && !revs.isEmpty() :
-                                "Revisions should not be empty: [revs=" + revs + ']';
+                                nextRetEntry = null;
 
-                        //lastRevision(re)
+                                lastRetKey = e.key();
 
-                        return null;
+                                return e;
+                            } else
+                                hasNext();
+                        }
                     }
                 }
             };
         }
+    }
 
-        @Override public void forEach(Consumer<? super Entry> action) {
-            Cursor.super.forEach(action);
+    private class WatchCursor implements Cursor<WatchEvent> {
+        private final Predicate<byte[]> p;
+        private long lastRetRev;
+        private long nextRetRev = -1;
+
+        WatchCursor(long rev, Predicate<byte[]> p) {
+            this.p = p;
+            this.lastRetRev = rev - 1;
         }
 
-        @Override public Spliterator<Entry> spliterator() {
-            return Cursor.super.spliterator();
+        @Override public void close() throws Exception {
+            // TODO: implement
+        }
+
+        @NotNull
+        @Override public Iterator<WatchEvent> iterator() {
+            return new Iterator<>() {
+                @Override public boolean hasNext() {
+                    synchronized (mux) {
+                        if (nextRetRev != -1)
+                            return true;
+
+                        while (true) {
+                            long curRev = lastRetRev + 1;
+
+                            NavigableMap<byte[], Value> entries = revsIdx.get(curRev);
+
+                            if (entries == null)
+                                return false;
+
+                            for (byte[] key : entries.keySet()) {
+                                if (p.test(key)) {
+                                    nextRetRev = curRev;
+
+                                    return true;
+                                }
+                            }
+
+                            lastRetRev++;
+                        }
+                    }
+                }
+
+                @Override public WatchEvent next() {
+                    synchronized (mux) {
+                        while (true) {
+                            if (nextRetRev != -1) {
+                                NavigableMap<byte[], Value> entries = revsIdx.get(nextRetRev);
+
+                                if (entries == null)
+                                    return null;
+
+                                List<EntryEvent> evts = new ArrayList<>(entries.size());
+
+                                for (Map.Entry<byte[], Value> e : entries.entrySet()) {
+                                    byte[] key = e.getKey();
+
+                                    Value val = e.getValue();
+
+                                    if (p.test(key)) {
+                                        Entry newEntry;
+
+                                        if (val.tombstone())
+                                            newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
+                                        else
+                                            newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
+
+                                        Entry oldEntry = doGet(key, nextRetRev - 1, false);
+
+                                        evts.add(new EntryEvent(oldEntry, newEntry));
+                                    }
+                                }
+
+                                if (evts.isEmpty())
+                                    continue;
+
+                                lastRetRev = nextRetRev;
+
+                                nextRetRev = -1;
+
+                                return new WatchEvent(evts);
+                            } else if (!hasNext())
+                                return null;
+                        }
+                    }
+                }
+            };
         }
     }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
index 250a5ea..a438fd4 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
deleted file mode 100644
index 26cfa5c..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Arrays;
-import java.util.Comparator;
-
-public class Watch {
-    private static final Comparator<byte[]> CMP = Arrays::compare;
-
-    private static final long ANY_REVISION = -1;
-
-    @Nullable
-    private byte[] startKey;
-
-    @Nullable
-    private byte[] endKey;
-
-    long rev = ANY_REVISION;
-
-    public void startKey(byte[] startKey) {
-        this.startKey = startKey;
-    }
-
-    public void endKey(byte[] endKey) {
-        this.endKey = endKey;
-    }
-
-    public void revision(long rev) {
-        this.rev = rev;
-    }
-
-    public void notify(Entry e) {
-        if (startKey != null && CMP.compare(e.key(), startKey) < 0)
-            return;
-
-        if (endKey != null && CMP.compare(e.key(), endKey) > 0)
-            return;
-
-        if (rev != ANY_REVISION && e.revision() <= rev)
-            return;
-
-        System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
-    }
-}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
new file mode 100644
index 0000000..561f203
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.Collection;
+import java.util.List;
+
+public class WatchEvent {
+    private final List<EntryEvent> entryEvts;
+
+    private final boolean batch;
+
+    /**
+     * Constructs an watch event with given entry events collection.
+     *
+     * @param entryEvts Events for entries corresponding to an update under one revision.
+     */
+    public WatchEvent(List<EntryEvent> entryEvts) {
+        assert entryEvts != null && !entryEvts.isEmpty();
+
+        this.batch = entryEvts.size() > 1;
+        this.entryEvts = entryEvts;
+    }
+
+    public boolean batch() {
+        return batch;
+    }
+
+    public Collection<EntryEvent> entryEvents() {
+        return entryEvts;
+    }
+
+    public EntryEvent entryEvent() {
+        if (batch)
+            throw new IllegalStateException("Watch event represents a batch of events.");
+
+        return entryEvts.get(0);
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
deleted file mode 100644
index 5516d06..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-public interface Watcher {
-    void register(@NotNull Watch watch);
-
-    void notify(@NotNull Entry e);
-
-    //TODO: implement
-    void cancel(@NotNull Watch watch);
-}
-
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
deleted file mode 100644
index dc126a0..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-public class WatcherImpl implements Watcher {
-    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
-
-    private final List<Watch> watches = new ArrayList<>();
-
-    private volatile boolean stop;
-
-    private final Object mux = new Object();
-
-    @Override public void register(@NotNull Watch watch) {
-        synchronized (mux) {
-            watches.add(watch);
-        }
-    }
-
-    @Override public void notify(@NotNull Entry e) {
-        queue.offer(e);
-    }
-
-    @Override
-    public void cancel(@NotNull Watch watch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    public void shutdown() {
-        stop = true;
-    }
-
-    private class WatcherWorker implements Runnable {
-        @Override public void run() {
-            while (!stop) {
-                try {
-                    Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
-
-                    if (e != null) {
-                        synchronized (mux) {
-                            watches.forEach(w -> w.notify(e));
-                        }
-                    }
-                }
-                catch (InterruptedException interruptedException) {
-                    // No-op.
-                }
-            }
-        }
-    }
-}
-
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 4a73137..27df790 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -1,17 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Function;
+import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
+import org.apache.ignite.metastorage.common.Cursor;
 import org.apache.ignite.metastorage.common.Key;
-import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static java.util.function.Function.identity;
 import static org.junit.jupiter.api.Assertions.*;
 
 class SimpleInMemoryKeyValueStorageTest {
@@ -19,7 +36,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
     @BeforeEach
     public void setUp() {
-        storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+        storage = new SimpleInMemoryKeyValueStorage();
     }
 
     @Test
@@ -91,7 +108,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -166,7 +183,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -204,7 +221,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -308,7 +325,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -382,7 +399,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(3, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -417,7 +434,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -601,7 +618,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -676,7 +693,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -719,7 +736,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -987,60 +1004,377 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    public void iterate() {
-        TreeMap<String, String> expFooMap = new TreeMap<>();
-        TreeMap<String, String> expKeyMap = new TreeMap<>();
-        TreeMap<String, String> expZooMap = new TreeMap<>();
+    public void rangeCursor() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
 
-        fill("foo", storage, expFooMap);
-        fill("key", storage, expKeyMap);
-        fill("zoo", storage, expZooMap);
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
 
-        assertEquals(300, storage.revision());
-        assertEquals(300, storage.updateCounter());
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
 
-        assertIterate("key", storage, expKeyMap);
-        assertIterate("zoo", storage, expZooMap);
-        assertIterate("foo", storage, expFooMap);
-    }
 
-    private void assertIterate(String pref,  KeyValueStorage storage, TreeMap<String, String> expMap) {
-        Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
-        Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3));
+
+        assertEquals(1, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        // Range for latest revision without max bound.
+        Cursor<Entry> cur = storage.range(key1, null);
+
+        Iterator<Entry> it = cur.iterator();
+
+        assertTrue(it.hasNext());
+
+        Entry e1 = it.next();
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(key1, e1.key());
+        assertArrayEquals(val1, e1.value());
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        Entry e2 = it.next();
 
-        // Order.
-        while (it.hasNext()) {
-            Entry entry = it.next();
-            Map.Entry<String, String> expEntry = expIt.next();
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2, e2.value());
+        assertEquals(1, e2.revision());
+        assertEquals(2, e2.updateCounter());
+
+        // Deliberately don't call it.hasNext()
+
+        Entry e3 = it.next();
+
+        assertFalse(e3.empty());
+        assertFalse(e3.tombstone());
+        assertArrayEquals(key3, e3.key());
+        assertArrayEquals(val3, e3.value());
+        assertEquals(1, e3.revision());
+        assertEquals(3, e3.updateCounter());
+
+        assertFalse(it.hasNext());
 
-            assertEquals(expEntry.getKey(), new String(entry.key()));
-            assertEquals(expEntry.getValue(), new String(entry.value()));
+        try {
+            it.next();
+
+            fail();
+        }
+        catch (NoSuchElementException e) {
+            System.out.println();
+            // No-op.
         }
 
-        // Range boundaries.
-        it = storage.iterate((pref + '_').getBytes());
+        // Range for latest revision with max bound.
+        cur = storage.range(key1, key3);
+
+        it = cur.iterator();
+
+        assertTrue(it.hasNext());
+
+        e1 = it.next();
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(key1, e1.key());
+        assertArrayEquals(val1, e1.value());
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
 
-        while (it.hasNext()) {
-            Entry entry = it.next();
+        assertTrue(it.hasNext());
 
-            assertTrue(expMap.containsKey(new String(entry.key())));
+        e2 = it.next();
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2, e2.value());
+        assertEquals(1, e2.revision());
+        assertEquals(2, e2.updateCounter());
+
+        assertFalse(it.hasNext());
+
+        try {
+            it.next();
+
+            fail();
+        }
+        catch (NoSuchElementException e) {
+            System.out.println();
+            // No-op.
         }
     }
 
-    private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
-        for (int i = 0; i < 100; i++) {
-            String keyStr = pref + '_' + i;
+    @Test
+    public void watchCursorForRange() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
 
-            String valStr = "val_" + i;
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
 
-            expMap.put(keyStr, valStr);
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
-            byte[] key = keyStr.getBytes();
+        // Watch for all updates starting from revision 2.
+        Cursor<WatchEvent> cur = storage.watch(key1, null, 2);
 
-            byte[] val = valStr.getBytes();
+        Iterator<WatchEvent> it = cur.iterator();
 
-            storage.getAndPut(key, val);
-        }
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        // Revision is less than 2.
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1));
+
+        assertEquals(2, storage.revision());
+        assertEquals(4, storage.updateCounter());
+
+        // Revision is 2.
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertTrue(watchEvent.batch());
+
+        Map<Key, EntryEvent> map = watchEvent.entryEvents().stream()
+                .collect(Collectors.toMap(evt -> new Key(evt.entry().key()), identity()));
+
+        assertEquals(2, map.size());
+
+        // First update under revision.
+        EntryEvent e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+
+        Entry oldEntry2 = e2.oldEntry();
+
+        assertFalse(oldEntry2.empty());
+        assertFalse(oldEntry2.tombstone());
+        assertEquals(1, oldEntry2.revision());
+        assertEquals(2, oldEntry2.updateCounter());
+        assertArrayEquals(key2, oldEntry2.key());
+        assertArrayEquals(val2_1, oldEntry2.value());
+
+        Entry newEntry2 = e2.entry();
+
+        assertFalse(newEntry2.empty());
+        assertFalse(newEntry2.tombstone());
+        assertEquals(2, newEntry2.revision());
+        assertEquals(3, newEntry2.updateCounter());
+        assertArrayEquals(key2, newEntry2.key());
+        assertArrayEquals(val2_2, newEntry2.value());
+
+        // Second update under revision.
+        EntryEvent e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+
+        Entry oldEntry3 = e3.oldEntry();
+
+        assertTrue(oldEntry3.empty());
+        assertFalse(oldEntry3.tombstone());
+        assertArrayEquals(key3, oldEntry3.key());
+
+        Entry newEntry3 = e3.entry();
+
+        assertFalse(newEntry3.empty());
+        assertFalse(newEntry3.tombstone());
+        assertEquals(2, newEntry3.revision());
+        assertEquals(4, newEntry3.updateCounter());
+        assertArrayEquals(key3, newEntry3.key());
+        assertArrayEquals(val3_1, newEntry3.value());
+
+        assertFalse(it.hasNext());
+
+        storage.remove(key1);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        EntryEvent e1 = watchEvent.entryEvent();
+
+        Entry oldEntry1 = e1.oldEntry();
+
+        assertFalse(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+        assertEquals(1, oldEntry1.revision());
+        assertEquals(1, oldEntry1.updateCounter());
+        assertArrayEquals(key1, oldEntry1.key());
+        assertArrayEquals(val1_1, oldEntry1.value());
+
+        Entry newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertTrue(newEntry1.tombstone());
+        assertEquals(3, newEntry1.revision());
+        assertEquals(5, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertNull(newEntry1.value());
+
+        assertFalse(it.hasNext());
+    }
+
+
+    @Test
+    public void watchCursorForKey() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key1, 1);
+
+        Iterator<WatchEvent> it = cur.iterator();
+
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        EntryEvent e1 = watchEvent.entryEvent();
+
+        Entry oldEntry1 = e1.oldEntry();
+
+        assertTrue(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+
+        Entry newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertFalse(newEntry1.tombstone());
+        assertEquals(1, newEntry1.revision());
+        assertEquals(1, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_1, newEntry1.value());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key2, val2_2);
+
+        assertFalse(it.hasNext());
+
+        storage.put(key1, val1_2);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        e1 = watchEvent.entryEvent();
+
+        oldEntry1 = e1.oldEntry();
+
+        assertFalse(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+        assertEquals(1, oldEntry1.revision());
+        assertEquals(1, oldEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_1, newEntry1.value());
+
+         newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertFalse(newEntry1.tombstone());
+        assertEquals(3, newEntry1.revision());
+        assertEquals(4, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_2, newEntry1.value());
+
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void watchCursorForKeys() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1);
+
+        Iterator<WatchEvent> it = cur.iterator();
+
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertTrue(watchEvent.batch());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key2, val2_2);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key3, val3_2);
+
+        assertFalse(it.hasNext());
     }
 
     private static void fill(KeyValueStorage storage, int keySuffix, int num) {
@@ -1055,18 +1389,4 @@ class SimpleInMemoryKeyValueStorageTest {
     private static byte[] kv(int k, int v) {
         return ("key" + k + '_' + "val" + v).getBytes();
     }
-
-    private static class NoOpWatcher implements Watcher {
-        @Override public void register(@NotNull Watch watch) {
-            // No-op.
-        }
-
-        @Override public void notify(@NotNull Entry e) {
-            // No-op.
-        }
-
-        @Override public void cancel(@NotNull Watch watch) {
-            // No-op.
-        }
-    }
 }
\ No newline at end of file

[ignite-3] 05/08: IGNITE-14389 putAll initial (WIP)

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9bd40870da5a926a1592c97cf53226fc98fde879
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Thu Apr 15 21:18:56 2021 +0300

    IGNITE-14389 putAll initial (WIP)
---
 .../apache/ignite/internal/metastorage/server/KeyValueStorage.java | 2 ++
 .../internal/metastorage/server/SimpleInMemoryKeyValueStorage.java | 7 +++++++
 2 files changed, 9 insertions(+)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0596c4a..0f18ece 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -29,6 +29,8 @@ public interface KeyValueStorage {
     @NotNull
     Entry getAndPut(byte[] key, byte[] value);
 
+    void putAll(List<byte[]> keys, List<byte[]> values);
+
     void remove(byte[] key);
 
     @NotNull
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 8523f51..f532005 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -63,6 +63,13 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    @Override
+    public void putAll(List<byte[]> keys, List<byte[]> values) {
+        synchronized (mux) {
+
+        }
+    }
+
     @NotNull
     @Override public Entry get(byte[] key) {
         synchronized (mux) {

[ignite-3] 04/08: IGNITE-14389 getAll and tests (WIP)

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c6932ea28e177ed56d30c481fd3d8f9d8ee9bdba
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Apr 13 21:34:37 2021 +0300

    IGNITE-14389 getAll and tests (WIP)
---
 .../metastorage/client/MetaStorageService.java     |   4 +-
 modules/metastorage-server/pom.xml                 |   6 +
 .../metastorage/server/KeyValueStorage.java        |   8 +
 .../server/SimpleInMemoryKeyValueStorage.java      |  36 +++-
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 208 +++++++++++++++++++--
 5 files changed, 244 insertions(+), 18 deletions(-)

diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index dfb4868..d4ded44 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -52,7 +52,7 @@ public interface MetaStorageService {
      * Retrieves an entry for the given key and the revision upper bound.
      *
      * @param key The key. Couldn't be {@code null}.
-     * @param revUpperBound  The upper bound for entry revisions. Must be positive.
+     * @param revUpperBound The upper bound for entry revisions. Must be positive.
      * @return An entry for the given key and maximum revision limited by {@code revUpperBound}.
      * Couldn't be {@code null}.
      * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
@@ -82,7 +82,7 @@ public interface MetaStorageService {
      *
      * @param keys The collection of keys. Couldn't be {@code null} or empty.
      *             Collection elements couldn't be {@code null}.
-     * @param revUpperBound  The upper bound for entry revisions. Must be positive.
+     * @param revUpperBound The upper bound for entry revisions. Must be positive.
      * @return A map of entries for given keys and maximum revision limited by {@code revUpperBound}.
      * Couldn't be {@code null}.
      * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index 3c51fc5..d73d080 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -40,6 +40,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>metastorage-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.jetbrains</groupId>
             <artifactId>annotations</artifactId>
         </dependency>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ead1043..0596c4a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -2,7 +2,9 @@ package org.apache.ignite.internal.metastorage.server;
 
 import org.jetbrains.annotations.NotNull;
 
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 public interface KeyValueStorage {
 
@@ -16,6 +18,12 @@ public interface KeyValueStorage {
     @NotNull
     Entry get(byte[] key, long rev);
 
+    @NotNull
+    Collection<Entry> getAll(List<byte[]> keys);
+
+    @NotNull
+    Collection<Entry> getAll(List<byte[]> keys, long revUpperBound);
+
     void put(byte[] key, byte[] value);
 
     @NotNull
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 3700f4a..8523f51 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -2,6 +2,7 @@ package org.apache.ignite.internal.metastorage.server;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
@@ -78,6 +79,16 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     }
 
     @Override
+    public @NotNull Collection<Entry> getAll(List<byte[]> keys) {
+        return doGetAll(keys, LATEST_REV);
+    }
+
+    @Override
+    public @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+        return doGetAll(keys, revUpperBound);
+    }
+
+    @Override
     public void remove(byte[] key) {
         synchronized (mux) {
             Entry e = doGet(key, LATEST_REV, false);
@@ -197,17 +208,17 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
             NavigableMap<byte[], List<Long>> compactedKeysIdx,
             NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx
     ) {
-        Long lrev = lastRevision(revs);
+        Long lastRev = lastRevision(revs);
 
-        NavigableMap<byte[], Value> kv = revsIdx.get(lrev);
+        NavigableMap<byte[], Value> kv = revsIdx.get(lastRev);
 
         Value lastVal = kv.get(key);
 
         if (!lastVal.tombstone()) {
-            compactedKeysIdx.put(key, listOf(lrev));
+            compactedKeysIdx.put(key, listOf(lastRev));
 
             NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
-                    lrev,
+                    lastRev,
                     k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
             );
 
@@ -215,6 +226,23 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    @NotNull
+    private Collection<Entry> doGetAll(List<byte[]> keys, long rev) {
+        assert keys != null : "keys list can't be null.";
+        assert !keys.isEmpty() : "keys list can't be empty.";
+        assert rev > 0 : "Revision must be positive.";
+
+        Collection<Entry> res = new ArrayList<>(keys.size());
+
+        synchronized (mux) {
+            for (byte[] key : keys) {
+                res.add(doGet(key, rev, false));
+            }
+        }
+
+        return res;
+    }
+
     /**
      * Returns entry for given key.
      *
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 5b797fc..fa130e6 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -1,17 +1,18 @@
 package org.apache.ignite.internal.metastorage.server;
 
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ignite.metastorage.common.Key;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
 
 class SimpleInMemoryKeyValueStorageTest {
     private KeyValueStorage storage;
@@ -56,6 +57,193 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    void getAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove.
+        storage.put(key3, val3);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
+    void getAllWithRevisionBound() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove.
+        storage.put(key3, val3);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        // Bounded by revision 2.
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4), 2);
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test while not rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(2, e2.revision());
+        assertEquals(2, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_1, e2.value());
+
+        // Values with larger revision don't exist yet.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertTrue(e3.empty());
+
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertTrue(e4.empty());
+
+        // Bounded by revision 4.
+        entries = storage.getAll(List.of(key1, key2, key3, key4), 4);
+
+        assertEquals(4, entries.size());
+
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test not removed value.
+        e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(4, e3.revision());
+        assertEquals(4, e3.updateCounter());
+        assertFalse(e3.tombstone());
+        assertFalse(e3.empty());
+        assertArrayEquals(val3, e3.value());
+
+        // Value with larger revision doesn't exist yet.
+        e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertTrue(e4.empty());
+    }
+
+    @Test
     public void getAndPut() {
         byte[] key = k(1);
         byte[] val = kv(1, 1);
@@ -208,7 +396,6 @@ class SimpleInMemoryKeyValueStorageTest {
     @Test
     public void getAndPutAfterRemove() {
         byte[] key = k(1);
-
         byte[] val = kv(1, 1);
 
         storage.getAndPut(key, val);
@@ -218,11 +405,8 @@ class SimpleInMemoryKeyValueStorageTest {
         Entry e = storage.getAndPut(key, val);
 
         assertEquals(3, storage.revision());
-
         assertEquals(3, storage.updateCounter());
-
         assertEquals(2, e.revision());
-
         assertTrue(e.tombstone());
     }
 

[ignite-3] 01/08: IGNITE-14389 Meta storage: in-memory implementation WIP

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit cd44be6843b0ddf8f55eacf2dfc8331bd6308408
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Mar 30 21:21:07 2021 +0300

    IGNITE-14389 Meta storage: in-memory implementation WIP
---
 modules/metastorage-server/pom.xml                 |  60 +++++
 .../ignite/internal/metastorage/server/Entry.java  | 146 +++++++++++
 .../metastorage/server/KeyValueStorage.java        |  28 +++
 .../server/SimpleInMemoryKeyValueStorage.java      | 267 ++++++++++++++++++++
 .../ignite/internal/metastorage/server/Watch.java  |  45 ++++
 .../internal/metastorage/server/Watcher.java       |  13 +
 .../internal/metastorage/server/WatcherImpl.java   |  58 +++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 274 +++++++++++++++++++++
 pom.xml                                            |   1 +
 9 files changed, 892 insertions(+)

diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
new file mode 100644
index 0000000..3c51fc5
--- /dev/null
+++ b/modules/metastorage-server/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>metastorage-server</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+
+        <!-- Test dependencies. -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
new file mode 100644
index 0000000..442aef9
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -0,0 +1,146 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a storage unit as entry with key, value and revision, where
+ * <ul>
+ *     <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li>
+ *     <ul>value - a data which is associated with a key and represented as an array of bytes.</ul>
+ *     <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul>
+ * </ul>
+ *
+ * Instance of {@link #Entry} could represents:
+ * <ul>
+ *     <li>A regular entry which stores a particular key, a value and a revision number.</li>
+ *     <li>An empty entry which denotes absence a regular entry in the meta storage for a given key.
+ *     A revision is 0 for such kind of entry.</li>
+ *     <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
+ * </ul>
+ */
+//TODO: Separate client and server entries. Empty and tombstone for client is the same.
+public class Entry {
+    /** Entry key. Couldn't be {@code null}. */
+    @NotNull
+    final private byte[] key;
+
+    /**
+     * Entry value.
+     * <p>
+     *     {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries.
+     * </p>
+     */
+    @Nullable
+    final private byte[] val;
+
+    /**
+     * Revision number corresponding to this particular entry.
+     * <p>
+     *     {@code rev == 0} for {@link #empty()} entry,
+     *     {@code rev > 0} for regular and {@link #tombstone()} entries.
+     * </p>
+     */
+    final private long rev;
+
+    /**
+     * Constructor.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param val Value bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
+    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+        assert key != null : "key can't be null";
+        assert val != null : "value can't be null";
+
+        this.key = key;
+        this.val = val;
+        this.rev = rev;
+    }
+
+    /**
+     * Constructor for empty and tombstone entries.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    private Entry(@NotNull byte[] key, long rev) {
+        assert key != null : "key can't be null";
+
+        this.key = key;
+        this.val = null;
+        this.rev = rev;
+    }
+
+    /**
+     * Creates an instance of empty entry for a given key.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry empty(byte[] key) {
+        return new Entry(key, 0);
+    }
+
+    /**
+     * Creates an instance of tombstone entry for a given key and a revision.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry tombstone(byte[] key, long rev) {
+        assert rev > 0 : "rev must be positive for tombstone entry.";
+
+        return new Entry(key, rev);
+    }
+
+    /**
+     * Returns a key.
+     *
+     * @return Key.
+     */
+    @NotNull
+    public byte[] key() {
+        return key;
+    }
+
+    /**
+     * Returns a value.
+     *
+     * @return Value.
+     */
+    @Nullable
+    public byte[] value() {
+        return val;
+    }
+
+    /**
+     * Returns a revision.
+     * @return Revision.
+     */
+    public long revision() {
+        return rev;
+    }
+
+    /**
+     * Returns value which denotes whether entry is tombstone or not.
+     *
+     * @return {@code True} if entry is tombstone, otherwise - {@code false}.
+     */
+    public boolean tombstone() {
+        return val == null && rev > 0;
+    }
+
+    /**
+     * Returns value which denotes whether entry is empty or not.
+     *
+     * @return {@code True} if entry is empty, otherwise - {@code false}.
+     */
+    public boolean empty() {
+        return val == null && rev == 0;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
new file mode 100644
index 0000000..1bf6b78
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -0,0 +1,28 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+
+public interface KeyValueStorage {
+
+    long revision();
+
+    @NotNull
+    Entry put(byte[] key, byte[] value);
+
+    @NotNull
+    Entry get(byte[] key);
+
+    @NotNull
+    Entry get(byte[] key, long rev);
+
+    @NotNull
+    Entry remove(byte[] key);
+
+    Iterator<Entry> iterate(byte[] key);
+
+    //Iterator<Entry> iterate(long rev);
+
+    void compact();
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
new file mode 100644
index 0000000..9059aec
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -0,0 +1,267 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
+ */
+public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
+    private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+
+    private static final byte[] TOMBSTONE = new byte[0];
+
+    private static final long LATEST_REV = -1;
+
+    private final Watcher watcher;
+
+    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+    private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+
+    private long grev = 0;
+
+    private final Object mux = new Object();
+
+    public SimpleInMemoryKeyValueStorage(Watcher watcher) {
+        this.watcher = watcher;
+    }
+
+    @Override public long revision() {
+        return grev;
+    }
+
+    @NotNull
+    @Override public Entry put(byte[] key, byte[] val) {
+        synchronized (mux) {
+            long crev = ++grev;
+
+            // Update keysIdx.
+            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+            long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+            revs.add(crev);
+
+            // Update revsIdx.
+            NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            entries.put(key, val);
+
+            revsIdx.put(crev, entries);
+
+            // Return previous value.
+            if (lrev == 0)
+                return Entry.empty(key);
+
+            NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+
+            Entry res = new Entry(key, lastVal.get(key), lrev);
+
+            //TODO: notify watchers
+
+            return res;
+        }
+    }
+
+    @NotNull
+    @Override public Entry get(byte[] key) {
+        synchronized (mux) {
+            return doGet(key, LATEST_REV);
+        }
+    }
+
+    @NotNull
+    @TestOnly
+    @Override public Entry get(byte[] key, long rev) {
+        synchronized (mux) {
+            return doGet(key, rev);
+        }
+    }
+
+    @NotNull
+    @Override public Entry remove(byte[] key) {
+        synchronized (mux) {
+            Entry e = doGet(key, LATEST_REV);
+
+            if (e.value() == null)
+                return e;
+
+            return put(key, TOMBSTONE);
+        }
+    }
+
+    @Override public Iterator<Entry> iterate(byte[] keyFrom) {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
+
+            final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
+
+            return new Iterator<>() {
+                private Map.Entry<byte[], List<Long>> curr;
+                private boolean hasNext;
+
+                private void advance() {
+                    if (it.hasNext()) {
+                        Map.Entry<byte[], List<Long>> e = it.next();
+
+                        byte[] key = e.getKey();
+
+                        if (!isPrefix(keyFrom, key))
+                            hasNext = false;
+                        else {
+                            curr = e;
+
+                            hasNext = true;
+                        }
+                    } else
+                        hasNext = false;
+                }
+
+                @Override
+                public boolean hasNext() {
+                    synchronized (mux) {
+                        if (curr == null)
+                            advance();
+
+                        return hasNext;
+                    }
+                }
+
+                @Override
+                public Entry next() {
+                    synchronized (mux) {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        Map.Entry<byte[], List<Long>> e = curr;
+
+                        curr = null;
+
+                        byte[] key = e.getKey();
+
+                        List<Long> revs = e.getValue();
+
+                        long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+
+                        if (rev == 0) {
+                            throw new IllegalStateException("rev == 0");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+
+                        if (vals == null || vals.isEmpty()) {
+                            throw new IllegalStateException("vals == null || vals.isEmpty()");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        byte[] val = vals.get(key);
+
+                        return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+                    }
+                }
+            };
+        }
+    }
+
+    @Override public void compact() {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+
+            keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
+
+            keysIdx = compactedKeysIdx;
+
+            revsIdx = compactedRevsIdx;
+        }
+    }
+
+    private void compactForKey(
+            byte[] key,
+            List<Long> revs,
+            NavigableMap<byte[], List<Long>> compactedKeysIdx,
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+    ) {
+        Long lrev = lastRevision(revs);
+
+        NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+
+        byte[] lastVal = kv.get(key);
+
+        if (lastVal != TOMBSTONE) {
+            compactedKeysIdx.put(key, listOf(lrev));
+
+            NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+                    lrev,
+                    k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+            );
+
+            compactedKv.put(key, lastVal);
+        }
+    }
+
+    /**
+     * Returns entry for given key.
+     *
+     * @param key Key.
+     * @param rev Revision.
+     * @return Entry for given key.
+     */
+    @NotNull private Entry doGet(byte[] key, long rev) {
+        List<Long> revs = keysIdx.get(key);
+
+        if (revs == null || revs.isEmpty())
+            return Entry.empty(key);
+
+        long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+
+        NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+
+        if (entries == null || entries.isEmpty())
+            return Entry.empty(key);
+
+        byte[] val = entries.get(key);
+
+        if (val == TOMBSTONE)
+            return Entry.tombstone(key, lrev);
+
+        return new Entry(key, val , lrev);
+    }
+
+    private static boolean isPrefix(byte[] pref, byte[] term) {
+        if (pref.length > term.length)
+            return false;
+
+        for (int i = 0; i < pref.length - 1; i++) {
+            if (pref[i] != term[i])
+                return false;
+        }
+
+        return true;
+    }
+
+    private static long lastRevision(List<Long> revs) {
+        return revs.get(revs.size() - 1);
+    }
+
+    private static  List<Long> listOf(long val) {
+        List<Long> res = new ArrayList<>();
+
+        res.add(val);
+
+        return res;
+    }
+
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
new file mode 100644
index 0000000..26cfa5c
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class Watch {
+    private static final Comparator<byte[]> CMP = Arrays::compare;
+
+    private static final long ANY_REVISION = -1;
+
+    @Nullable
+    private byte[] startKey;
+
+    @Nullable
+    private byte[] endKey;
+
+    long rev = ANY_REVISION;
+
+    public void startKey(byte[] startKey) {
+        this.startKey = startKey;
+    }
+
+    public void endKey(byte[] endKey) {
+        this.endKey = endKey;
+    }
+
+    public void revision(long rev) {
+        this.rev = rev;
+    }
+
+    public void notify(Entry e) {
+        if (startKey != null && CMP.compare(e.key(), startKey) < 0)
+            return;
+
+        if (endKey != null && CMP.compare(e.key(), endKey) > 0)
+            return;
+
+        if (rev != ANY_REVISION && e.revision() <= rev)
+            return;
+
+        System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
new file mode 100644
index 0000000..5516d06
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface Watcher {
+    void register(@NotNull Watch watch);
+
+    void notify(@NotNull Entry e);
+
+    //TODO: implement
+    void cancel(@NotNull Watch watch);
+}
+
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
new file mode 100644
index 0000000..dc126a0
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
@@ -0,0 +1,58 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WatcherImpl implements Watcher {
+    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+    private final List<Watch> watches = new ArrayList<>();
+
+    private volatile boolean stop;
+
+    private final Object mux = new Object();
+
+    @Override public void register(@NotNull Watch watch) {
+        synchronized (mux) {
+            watches.add(watch);
+        }
+    }
+
+    @Override public void notify(@NotNull Entry e) {
+        queue.offer(e);
+    }
+
+    @Override
+    public void cancel(@NotNull Watch watch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    public void shutdown() {
+        stop = true;
+    }
+
+    private class WatcherWorker implements Runnable {
+        @Override public void run() {
+            while (!stop) {
+                try {
+                    Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
+
+                    if (e != null) {
+                        synchronized (mux) {
+                            watches.forEach(w -> w.notify(e));
+                        }
+                    }
+                }
+                catch (InterruptedException interruptedException) {
+                    // No-op.
+                }
+            }
+        }
+    }
+}
+
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
new file mode 100644
index 0000000..f7fb17e
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -0,0 +1,274 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleInMemoryKeyValueStorageTest {
+    private KeyValueStorage storage;
+
+    @BeforeEach
+    public void setUp() {
+        storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+    }
+
+    @Test
+    void putGetRemoveCompact() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 1);
+        byte[] val1_3 = kv(1, 3);
+
+        byte[] key2 = k(2);
+        byte[] val2_2 = kv(2, 2);
+
+        assertEquals(0, storage.revision());
+
+        // Previous entry is empty.
+        Entry emptyEntry = storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 1.
+        Entry e1_1 = storage.get(key1);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(1, storage.revision());
+
+        // Previous entry is empty.
+        emptyEntry = storage.put(key2, val2_2);
+
+        assertEquals(2, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 2.
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2_2, e2.value());
+        assertEquals(2, e2.revision());
+        assertEquals(2, storage.revision());
+
+        // Previous entry is not empty.
+        e1_1 = storage.put(key1, val1_3);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(3, storage.revision());
+
+        // Entry with rev == 3.
+        Entry e1_3 = storage.get(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(3, storage.revision());
+
+        // Remove existing entry.
+        Entry e2_2 = storage.remove(key2);
+
+        assertFalse(e2_2.empty());
+        assertFalse(e2_2.tombstone());
+        assertArrayEquals(key2, e2_2.key());
+        assertArrayEquals(val2_2, e2_2.value());
+        assertEquals(2, e2_2.revision());
+        assertEquals(4, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        Entry tombstoneEntry = storage.remove(key2);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(4, storage.revision()); // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(4, storage.revision());
+        assertTrue(storage.remove(key2).empty());
+        assertTrue(storage.get(key2).empty());
+
+        // Remove existing entry.
+        e1_3 = storage.remove(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(5, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        tombstoneEntry = storage.remove(key1);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(5, storage.revision()); // // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(5, storage.revision());
+        assertTrue(storage.remove(key1).empty());
+        assertTrue(storage.get(key1).empty());
+    }
+
+    @Test
+    void compact() {
+        assertEquals(0, storage.revision());
+
+        // Compact empty.
+        storage.compact();
+
+        assertEquals(0, storage.revision());
+
+        // Compact non-empty.
+        fill(storage, 1, 1);
+
+        assertEquals(1, storage.revision());
+
+        fill(storage, 2, 2);
+
+        assertEquals(3, storage.revision());
+
+        fill(storage, 3, 3);
+
+        assertEquals(6, storage.revision());
+
+        storage.remove(k(3));
+
+        assertEquals(7, storage.revision());
+        assertTrue(storage.get(k(3)).tombstone());
+
+        storage.compact();
+
+        assertEquals(7, storage.revision());
+
+        Entry e1 = storage.get(k(1));
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(k(1), e1.key());
+        assertArrayEquals(kv(1,1), e1.value());
+        assertEquals(1, e1.revision());
+
+        Entry e2 = storage.get(k(2));
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(k(2), e2.key());
+        assertArrayEquals(kv(2,2), e2.value());
+        assertTrue(storage.get(k(2), 2).empty());
+        assertEquals(3, e2.revision());
+
+        Entry e3 = storage.get(k(3));
+
+        assertTrue(e3.empty());
+        assertTrue(storage.get(k(3), 5).empty());
+        assertTrue(storage.get(k(3), 6).empty());
+        assertTrue(storage.get(k(3), 7).empty());
+    }
+
+    @Test
+    void iterate() {
+        TreeMap<String, String> expFooMap = new TreeMap<>();
+        TreeMap<String, String> expKeyMap = new TreeMap<>();
+        TreeMap<String, String> expZooMap = new TreeMap<>();
+
+        fill("foo", storage, expFooMap);
+        fill("key", storage, expKeyMap);
+        fill("zoo", storage, expZooMap);
+
+        assertEquals(300, storage.revision());
+
+        assertIterate("key", storage, expKeyMap);
+        assertIterate("zoo", storage, expZooMap);
+        assertIterate("foo", storage, expFooMap);
+    }
+
+    private void assertIterate(String pref,  KeyValueStorage storage, TreeMap<String, String> expMap) {
+        Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
+        Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+
+        // Order.
+        while (it.hasNext()) {
+            Entry entry = it.next();
+            Map.Entry<String, String> expEntry = expIt.next();
+
+            assertEquals(expEntry.getKey(), new String(entry.key()));
+            assertEquals(expEntry.getValue(), new String(entry.value()));
+        }
+
+        // Range boundaries.
+        it = storage.iterate((pref + '_').getBytes());
+
+        while (it.hasNext()) {
+            Entry entry = it.next();
+
+            assertTrue(expMap.containsKey(new String(entry.key())));
+        }
+    }
+
+    private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
+        for (int i = 0; i < 100; i++) {
+            String keyStr = pref + '_' + i;
+
+            String valStr = "val_" + i;
+
+            expMap.put(keyStr, valStr);
+
+            byte[] key = keyStr.getBytes();
+
+            byte[] val = valStr.getBytes();
+
+            storage.put(key, val);
+        }
+    }
+
+    private static void fill(KeyValueStorage storage, int keySuffix, int num) {
+        for (int i = 0; i < num; i++)
+            storage.put(k(keySuffix), kv(keySuffix, i + 1));
+    }
+
+    private static byte[] k(int k) {
+        return ("key" + k).getBytes();
+    }
+
+    private static byte[] kv(int k, int v) {
+        return ("key" + k + '_' + "val" + v).getBytes();
+    }
+
+    private static class NoOpWatcher implements Watcher {
+        @Override public void register(@NotNull Watch watch) {
+            // No-op.
+        }
+
+        @Override public void notify(@NotNull Entry e) {
+            // No-op.
+        }
+
+        @Override public void cancel(@NotNull Watch watch) {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 0f89644..3e7da2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
         <module>modules/metastorage</module>
         <module>modules/metastorage-client</module>
         <module>modules/metastorage-common</module>
+        <module>modules/metastorage-server</module>
         <module>modules/network</module>
         <module>modules/raft</module>
         <module>modules/raft-client</module>

[ignite-3] 03/08: IGNITE-14389 Added get and do smth semantic

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 52271bed844e2b7451d838ece15572fa9d12dd89
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Apr 6 22:25:03 2021 +0300

    IGNITE-14389 Added get and do smth semantic
---
 .../metastorage/server/KeyValueStorage.java        |  12 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 147 +++++++++----
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 235 +++++++++++++++++++--
 3 files changed, 329 insertions(+), 65 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index e245e08..ead1043 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -11,16 +11,20 @@ public interface KeyValueStorage {
     long updateCounter();
 
     @NotNull
-    Entry put(byte[] key, byte[] value);
-
-    @NotNull
     Entry get(byte[] key);
 
     @NotNull
     Entry get(byte[] key, long rev);
 
+    void put(byte[] key, byte[] value);
+
+    @NotNull
+    Entry getAndPut(byte[] key, byte[] value);
+
+    void remove(byte[] key);
+
     @NotNull
-    Entry remove(byte[] key);
+    Entry getAndRemove(byte[] key);
 
     Iterator<Entry> iterate(byte[] key);
 
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index b764998..3700f4a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -46,49 +46,26 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return updCntr;
     }
 
-    @NotNull
-    @Override public Entry put(byte[] key, byte[] bytes) {
+    @Override public void put(byte[] key, byte[] value) {
         synchronized (mux) {
-            long curRev = ++rev;
-
-            long curUpdCntr = ++updCntr;
-
-            // Update keysIdx.
-            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
-
-            long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
-
-            revs.add(curRev);
-
-            // Update revsIdx.
-            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
-
-            Value val = new Value(bytes, curUpdCntr);
-
-            entries.put(key, val);
+            doPut(key, value);
+        }
+    }
 
-            revsIdx.put(curRev, entries);
+    @NotNull
+    @Override public Entry getAndPut(byte[] key, byte[] bytes) {
+        synchronized (mux) {
+            long lastRev = doPut(key, bytes);
 
             // Return previous value.
-            if (lastRev == 0)
-                return Entry.empty(key);
-
-            NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
-
-            Value lastVal = lastRevVals.get(key);
-
-            Entry res = new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
-
-            //TODO: notify watchers
-
-            return res;
+            return doGetValue(key, lastRev);
         }
     }
 
     @NotNull
     @Override public Entry get(byte[] key) {
         synchronized (mux) {
-            return doGet(key, LATEST_REV);
+            return doGet(key, LATEST_REV, false);
         }
     }
 
@@ -96,19 +73,31 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     @TestOnly
     @Override public Entry get(byte[] key, long rev) {
         synchronized (mux) {
-            return doGet(key, rev);
+            return doGet(key, rev, true);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        synchronized (mux) {
+            Entry e = doGet(key, LATEST_REV, false);
+
+            if (e.empty() || e.tombstone())
+                return;
+
+            doPut(key, TOMBSTONE);
         }
     }
 
     @NotNull
-    @Override public Entry remove(byte[] key) {
+    @Override public Entry getAndRemove(byte[] key) {
         synchronized (mux) {
-            Entry e = doGet(key, LATEST_REV);
+            Entry e = doGet(key, LATEST_REV, false);
 
-            if (e.value() == null)
+            if (e.empty() || e.tombstone())
                 return e;
 
-            return put(key, TOMBSTONE);
+            return getAndPut(key, TOMBSTONE);
         }
     }
 
@@ -233,25 +222,91 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
      * @param rev Revision.
      * @return Entry for given key.
      */
-    @NotNull private Entry doGet(byte[] key, long rev) {
+    @NotNull
+    private Entry doGet(byte[] key, long rev, boolean exactRev) {
+        assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
+                "Invalid arguments: [rev=" + rev + ", exactRev=" + exactRev + ']';
+
         List<Long> revs = keysIdx.get(key);
 
         if (revs == null || revs.isEmpty())
             return Entry.empty(key);
 
-        long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+        long lastRev;
 
-        NavigableMap<byte[], Value> entries = revsIdx.get(lrev);
+        if (rev == LATEST_REV)
+            lastRev = lastRevision(revs);
+        else
+            lastRev = exactRev ? rev : maxRevision(revs, rev);
 
-        if (entries == null || entries.isEmpty())
+        // lastRev can be -1 if maxRevision return -1.
+        if (lastRev == -1)
             return Entry.empty(key);
 
-        Value val = entries.get(key);
+        return doGetValue(key, lastRev);
+    }
+
+    /**
+     * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then
+     * {@code -1} will be returned.
+     *
+     * @param revs Revisions list.
+     * @param upperBoundRev Revision upper bound.
+     * @return Appropriate revision or {@code -1} if there is no such revision.
+     */
+    private static long maxRevision(List<Long> revs, long upperBoundRev) {
+        int i = revs.size() - 1;
+
+        for (; i >= 0 ; i--) {
+            long rev = revs.get(i);
+
+            if (rev <= upperBoundRev)
+                return rev;
+        }
+
+        return -1;
+    }
+
+    @NotNull
+    private Entry doGetValue(byte[] key, long lastRev) {
+        if (lastRev == 0)
+            return Entry.empty(key);
+
+        NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
+
+        if (lastRevVals == null || lastRevVals.isEmpty())
+            return Entry.empty(key);
+
+        Value lastVal = lastRevVals.get(key);
+
+        if (lastVal.tombstone())
+            return Entry.tombstone(key, lastRev, lastVal.updateCounter());
+
+        return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter());
+    }
+
+    private long doPut(byte[] key, byte[] bytes) {
+        long curRev = ++rev;
+
+        long curUpdCntr = ++updCntr;
+
+        // Update keysIdx.
+        List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+        long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+        revs.add(curRev);
+
+        // Update revsIdx.
+        NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+        Value val = new Value(bytes, curUpdCntr);
+
+        entries.put(key, val);
 
-        if (val.tombstone())
-            return Entry.tombstone(key, lrev, val.updateCounter());
+        revsIdx.put(curRev, entries);
 
-        return new Entry(key, val.bytes() , lrev, val.updateCounter());
+        return lastRev;
     }
 
     private static boolean isPrefix(byte[] pref, byte[] term) {
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index eae76fd..5b797fc 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -22,7 +22,212 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    void putGetRemoveCompact() {
+    public void put() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        Entry e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+
+        storage.put(key, val);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAndPut() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        Entry e = storage.getAndPut(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+        assertTrue(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(0, e.revision());
+        assertEquals(0, e.updateCounter());
+
+        e = storage.getAndPut(key, val);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+    }
+
+    @Test
+    public void remove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        // Remove non-existent entry.
+        storage.remove(key);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // Remove existent entry.
+        storage.remove(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        Entry e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+
+        // Remove already removed entry (tombstone can't be removed).
+        storage.remove(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAndRemove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        // Remove non-existent entry.
+        Entry e = storage.getAndRemove(key);
+
+        assertTrue(e.empty());
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // Remove existent entry.
+        e = storage.getAndRemove(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+
+        // Remove already removed entry (tombstone can't be removed).
+        e = storage.getAndRemove(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAfterRemove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        storage.getAndPut(key, val);
+
+        storage.getAndRemove(key);
+
+        Entry e = storage.get(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+        assertEquals(2, e.revision());
+        assertTrue(e.tombstone());
+    }
+
+    @Test
+    public void getAndPutAfterRemove() {
+        byte[] key = k(1);
+
+        byte[] val = kv(1, 1);
+
+        storage.getAndPut(key, val);
+
+        storage.getAndRemove(key);
+
+        Entry e = storage.getAndPut(key, val);
+
+        assertEquals(3, storage.revision());
+
+        assertEquals(3, storage.updateCounter());
+
+        assertEquals(2, e.revision());
+
+        assertTrue(e.tombstone());
+    }
+
+    @Test
+    public void putGetRemoveCompact() {
         byte[] key1 = k(1);
         byte[] val1_1 = kv(1, 1);
         byte[] val1_3 = kv(1, 3);
@@ -34,7 +239,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(0, storage.updateCounter());
 
         // Previous entry is empty.
-        Entry emptyEntry = storage.put(key1, val1_1);
+        Entry emptyEntry = storage.getAndPut(key1, val1_1);
 
         assertEquals(1, storage.revision());
         assertEquals(1, storage.updateCounter());
@@ -53,7 +258,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         // Previous entry is empty.
-        emptyEntry = storage.put(key2, val2_2);
+        emptyEntry = storage.getAndPut(key2, val2_2);
 
         assertEquals(2, storage.revision());
         assertEquals(2, storage.updateCounter());
@@ -72,7 +277,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(2, storage.updateCounter());
 
         // Previous entry is not empty.
-        e1_1 = storage.put(key1, val1_3);
+        e1_1 = storage.getAndPut(key1, val1_3);
 
         assertFalse(e1_1.empty());
         assertFalse(e1_1.tombstone());
@@ -96,7 +301,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(3, storage.updateCounter());
 
         // Remove existing entry.
-        Entry e2_2 = storage.remove(key2);
+        Entry e2_2 = storage.getAndRemove(key2);
 
         assertFalse(e2_2.empty());
         assertFalse(e2_2.tombstone());
@@ -108,7 +313,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(4, storage.updateCounter());
 
         // Remove already removed entry.
-        Entry tombstoneEntry = storage.remove(key2);
+        Entry tombstoneEntry = storage.getAndRemove(key2);
 
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
@@ -120,11 +325,11 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, storage.revision());
         assertEquals(4, storage.updateCounter());
-        assertTrue(storage.remove(key2).empty());
+        assertTrue(storage.getAndRemove(key2).empty());
         assertTrue(storage.get(key2).empty());
 
         // Remove existing entry.
-        e1_3 = storage.remove(key1);
+        e1_3 = storage.getAndRemove(key1);
 
         assertFalse(e1_3.empty());
         assertFalse(e1_3.tombstone());
@@ -136,7 +341,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(5, storage.updateCounter());
 
         // Remove already removed entry.
-        tombstoneEntry = storage.remove(key1);
+        tombstoneEntry = storage.getAndRemove(key1);
 
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
@@ -148,12 +353,12 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(5, storage.revision());
         assertEquals(5, storage.updateCounter());
-        assertTrue(storage.remove(key1).empty());
+        assertTrue(storage.getAndRemove(key1).empty());
         assertTrue(storage.get(key1).empty());
     }
 
     @Test
-    void compact() {
+    public void compact() {
         assertEquals(0, storage.revision());
         assertEquals(0, storage.updateCounter());
 
@@ -179,7 +384,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(6, storage.revision());
         assertEquals(6, storage.updateCounter());
 
-        storage.remove(k(3));
+        storage.getAndRemove(k(3));
 
         assertEquals(7, storage.revision());
         assertEquals(7, storage.updateCounter());
@@ -218,7 +423,7 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    void iterate() {
+    public void iterate() {
         TreeMap<String, String> expFooMap = new TreeMap<>();
         TreeMap<String, String> expKeyMap = new TreeMap<>();
         TreeMap<String, String> expZooMap = new TreeMap<>();
@@ -270,13 +475,13 @@ class SimpleInMemoryKeyValueStorageTest {
 
             byte[] val = valStr.getBytes();
 
-            storage.put(key, val);
+            storage.getAndPut(key, val);
         }
     }
 
     private static void fill(KeyValueStorage storage, int keySuffix, int num) {
         for (int i = 0; i < num; i++)
-            storage.put(k(keySuffix), kv(keySuffix, i + 1));
+            storage.getAndPut(k(keySuffix), kv(keySuffix, i + 1));
     }
 
     private static byte[] k(int k) {

[ignite-3] 08/08: IGNITE-14389 Implemented conditional update (invoke)

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 73176e2350a47f77e4ea8f086746e7bfca2bf28d
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue May 4 18:37:10 2021 +0300

    IGNITE-14389 Implemented conditional update (invoke)
---
 .../ignite/internal/affinity/AffinityManager.java  |   8 +-
 .../metastorage/client/MetaStorageService.java     |  27 +-
 .../ignite/metastorage/common/Condition.java       | 133 +++------
 .../ignite/metastorage/common/Conditions.java      |  10 +-
 .../ignite/metastorage/common/Operation.java       |  35 ++-
 .../ignite/metastorage/common/Operations.java      |  10 +-
 modules/metastorage-server/pom.xml                 |   6 +-
 .../metastorage/server/AbstractCondition.java      |  13 +
 .../internal/metastorage/server/Condition.java     |   7 +
 .../metastorage/server/KeyValueStorage.java        |   2 +
 .../internal/metastorage/server/Operation.java     |  31 ++
 .../metastorage/server/RevisionCondition.java      |  75 +++++
 .../server/SimpleInMemoryKeyValueStorage.java      |  95 +++++-
 .../metastorage/server/ValueCondition.java         |  49 ++++
 .../metastorage/server/RevisionConditionTest.java  |  65 +++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 323 +++++++++++++++++++++
 .../metastorage/server/ValueConditionTest.java     |  27 ++
 .../internal/metastorage/MetaStorageManager.java   |  10 +-
 .../apache/ignite/internal/app/IgnitionImpl.java   |  51 ++--
 .../internal/table/distributed/TableManager.java   |   8 +-
 20 files changed, 817 insertions(+), 168 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 937af33..61cf7c5 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -149,9 +149,11 @@ public class AffinityManager {
                             int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                                 .tables().get(name).replicas().value();
 
-                            metaStorageMgr.invoke(evt.newEntry().key(),
-                                Conditions.value().eq(evt.newEntry().value()),
-                                Operations.put(ByteUtils.toBytes(
+                            Key key = evt.newEntry().key();
+
+                            metaStorageMgr.invoke(
+                                Conditions.value(key).eq(evt.newEntry().value()),
+                                Operations.put(key, ByteUtils.toBytes(
                                     RendezvousAffinityFunction.assignPartitions(
                                         baselineMgr.nodes(),
                                         partitions,
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index d4ded44..409f90d 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -202,7 +202,6 @@ public interface MetaStorageService {
      *
      * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
      *
-     * @param key The key. Couldn't be {@code null}.
      * @param condition The condition.
      * @param success The update which will be applied in case of condition evaluation yields {@code true}.
      * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
@@ -215,7 +214,7 @@ public interface MetaStorageService {
      */
     // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
     @NotNull
-    CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
+    CompletableFuture<Boolean> invoke(@NotNull Condition condition,
                                       @NotNull Operation success, @NotNull Operation failure);
 
     /**
@@ -223,7 +222,27 @@ public interface MetaStorageService {
      *
      * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
      *
-     * @param key The key. Couldn't be {@code null}.
+     * @param condition The condition.
+     * @param success The updates which will be applied in case of condition evaluation yields {@code true}.
+     * @param failure The updates which will be applied in case of condition evaluation yields {@code false}.
+     * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}.
+     * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+     * @see Key
+     * @see Entry
+     * @see Condition
+     * @see Operation
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
+    @NotNull
+    CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+                                      @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure);
+
+
+    /**
+     * Updates an entry for the given key conditionally.
+     *
+     * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
+     *
      * @param condition The condition.
      * @param success The update which will be applied in case of condition evaluation yields {@code true}.
      * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
@@ -236,7 +255,7 @@ public interface MetaStorageService {
      */
     //TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
     @NotNull
-    CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
+    CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition,
                                           @NotNull Operation success, @NotNull Operation failure);
 
     /**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
index 359829b..f7ab099 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.metastorage.common;
 
-import java.util.Arrays;
-
 /**
  * Represents a condition for conditional update.
  */
@@ -36,22 +34,11 @@ public final class Condition {
     }
 
     /**
-     * Tests the given entry on satisfaction of the condition.
-     *
-     * @param e Entry.
-     * @return The result of condition test. {@code true} - if the entry satisfies to the condition,
-     * otherwise - {@code false}.
-     */
-    public boolean test(Entry e) {
-        return cond.test(e);
-    }
-
-    /**
      * Represents condition on entry revision. Only one type of condition could be applied to
      * the one instance of condition. Subsequent invocations of any method which produces condition will throw
      * {@link IllegalStateException}.
      */
-    public static final class RevisionCondition implements InnerCondition {
+    public static final class RevisionCondition extends AbstractCondition {
         /**
          * The type of condition.
          *
@@ -65,10 +52,12 @@ public final class Condition {
         private long rev;
 
         /**
-         * Default no-op constructor.
+         * Constructs a condition by a revision for an entry identified by the given key.
+         *
+         * @param key Identifies an entry which condition will be applied to.
          */
-        RevisionCondition() {
-            // No-op.
+        RevisionCondition(byte[] key) {
+            super(key);
         }
 
         /**
@@ -173,66 +162,27 @@ public final class Condition {
             return new Condition(this);
         }
 
-        /** {@inheritDoc} */
-        @Override public boolean test(Entry e) {
-            int res = Long.compare(e.revision(), rev);
-
-            return type.test(res);
-        }
-
         /**
          * Defines possible condition types which can be applied to the revision.
          */
         enum Type {
             /** Equality condition type. */
-            EQUAL {
-                @Override public boolean test(long res) {
-                    return res == 0;
-                }
-            },
+            EQUAL,
 
             /** Inequality condition type. */
-            NOT_EQUAL {
-                @Override public boolean test(long res) {
-                    return res != 0;
-                }
-            },
+            NOT_EQUAL,
 
             /** Greater than condition type. */
-            GREATER {
-                @Override public boolean test(long res) {
-                    return res > 0;
-                }
-            },
+            GREATER,
 
             /** Less than condition type. */
-            LESS {
-                @Override public boolean test(long res) {
-                    return res < 0;
-                }
-            },
+            LESS,
 
             /** Less than or equal to condition type. */
-            LESS_OR_EQUAL {
-                @Override public boolean test(long res) {
-                    return res <= 0;
-                }
-            },
+            LESS_OR_EQUAL,
 
             /** Greater than or equal to condition type. */
-            GREATER_OR_EQUAL {
-                @Override public boolean test(long res) {
-                    return res >= 0;
-                }
-            };
-
-            /**
-             * Interprets comparison result.
-             *
-             * @param res The result of comparison.
-             * @return The interpretation of the comparison result.
-             */
-            public abstract boolean test(long res);
+            GREATER_OR_EQUAL
         }
     }
 
@@ -241,7 +191,7 @@ public final class Condition {
      * the one instance of condition. Subsequent invocations of any method which produces condition will throw
      * {@link IllegalStateException}.
      */
-    public static final class ValueCondition implements InnerCondition {
+    public static final class ValueCondition extends AbstractCondition {
         /**
          * The type of condition.
          *
@@ -255,10 +205,12 @@ public final class Condition {
         private byte[] val;
 
         /**
-         * Default no-op constructor.
+         * Constructs a condition by a value for an entry identified by the given key.
+         *
+         * @param key Identifies an entry which condition will be applied to.
          */
-        ValueCondition() {
-            // No-op.
+        ValueCondition(byte[] key) {
+            super(key);
         }
 
         /**
@@ -295,38 +247,15 @@ public final class Condition {
             return new Condition(this);
         }
 
-        /** {@inheritDoc} */
-        @Override public boolean test(Entry e) {
-            int res = Arrays.compare(e.value(), val);
-
-            return type.test(res);
-        }
-
         /**
          * Defines possible condition types which can be applied to the value.
          */
         enum Type {
             /** Equality condition type. */
-            EQUAL {
-                @Override public boolean test(long res) {
-                    return res == 0;
-                }
-            },
+            EQUAL,
 
             /** Inequality condition type. */
-            NOT_EQUAL {
-                @Override public boolean test(long res) {
-                    return res != 0;
-                }
-            };
-
-            /**
-             * Interprets comparison result.
-             *
-             * @param res The result of comparison.
-             * @return The interpretation of the comparison result.
-             */
-            public abstract boolean test(long res);
+            NOT_EQUAL
         }
     }
 
@@ -343,14 +272,24 @@ public final class Condition {
     /**
      * Defines condition interface.
      */
-    private interface InnerCondition {
+    public interface InnerCondition {
         /**
-         * Tests the given entry on satisfaction of the condition.
+         * Returns key which identifies an entry which condition will be applied to.
          *
-         * @param e Entry.
-         * @return The result of condition test. {@code true} - if the entry satisfies to the condition,
-         * otherwise - {@code false}.
+         * @return Key which identifies an entry which condition will be applied to.
          */
-        boolean test(Entry e);
+        byte[] key();
+    }
+
+    private static abstract class AbstractCondition implements InnerCondition {
+        private final byte[] key;
+
+        public AbstractCondition(byte[] key) {
+            this.key = key;
+        }
+
+        @Override public byte[] key() {
+            return key;
+        }
     }
 }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
index f83849a..567b20b 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
@@ -27,21 +27,23 @@ public final class Conditions {
     /**
      * Creates condition on entry revision.
      *
+     * @param key Identifies an entry which condition will be applied to.
      * @return Condition on entry revision.
      * @see Condition.RevisionCondition
      */
-    public static Condition.RevisionCondition revision() {
-        return new Condition.RevisionCondition();
+    public static Condition.RevisionCondition revision(Key key) {
+        return new Condition.RevisionCondition(key.bytes());
     }
 
     /**
      * Creates condition on entry value.
      *
+     * @param key Identifies an entry which condition will be applied to.
      * @return Condition on entry value.
      * @see Condition.ValueCondition
      */
-    public static Condition.ValueCondition value() {
-        return new Condition.ValueCondition();
+    public static Condition.ValueCondition value(Key key) {
+        return new Condition.ValueCondition(key.bytes());
     }
 
     /**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
index e085a28..57329f4 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.metastorage.common;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Defines operation for meta storage conditional update (invoke).
  */
@@ -37,28 +39,32 @@ public final class Operation {
     /**
      * Represents operation of type <i>remove</i>.
      */
-    public static final class RemoveOp implements InnerOp {
+    public static final class RemoveOp extends AbstractOp {
         /**
          * Default no-op constructor.
+         *
+         * @param key Identifies an entry which operation will be applied to.
          */
-        RemoveOp() {
-            // No-op.
+        RemoveOp(byte[] key) {
+            super(key);
         }
     }
 
     /**
      * Represents operation of type <i>put</i>.
      */
-    public static final class PutOp implements InnerOp {
+    public static final class PutOp extends AbstractOp {
         /** Value. */
         private final byte[] val;
 
         /**
          * Constructs operation of type <i>put</i>.
          *
+         * @param key Identifies an entry which operation will be applied to.
          * @param val The value to which the entry should be updated.
          */
-        PutOp(byte[] val) {
+        PutOp(byte[] key, byte[] val) {
+            super(key);
             this.val = val;
         }
     }
@@ -66,12 +72,12 @@ public final class Operation {
     /**
      * Represents operation of type <i>no-op</i>.
      */
-    public static final class NoOp implements InnerOp {
+    public static final class NoOp extends AbstractOp {
         /**
          * Default no-op constructor.
          */
         NoOp() {
-            // No-op.
+            super(null);
         }
     }
 
@@ -79,6 +85,19 @@ public final class Operation {
      * Defines operation interface.
      */
     private interface InnerOp {
-        // Marker interface.
+        @Nullable byte[] key();
+    }
+
+    private static class AbstractOp implements InnerOp {
+        @Nullable private final byte[] key;
+
+        public AbstractOp(@Nullable byte[] key) {
+            this.key = key;
+        }
+
+        @Nullable
+        @Override public byte[] key() {
+            return key;
+        }
     }
 }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
index 994f4bd..e51fa9e 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
@@ -30,20 +30,22 @@ public final class Operations {
     /**
      * Creates operation of type <i>remove</i>. This type of operation removes entry.
      *
+     * @param key Identifies an entry which operation will be applied to.
      * @return Operation of type <i>remove</i>.
      */
-    public static Operation remove() {
-        return new Operation(new Operation.RemoveOp());
+    public static Operation remove(Key key) {
+        return new Operation(new Operation.RemoveOp(key.bytes()));
     }
 
     /**
      * Creates operation of type <i>put</i>. This type of operation inserts or updates value of entry.
      *
+     * @param key Identifies an entry which operation will be applied to.
      * @param value Value.
      * @return Operation of type <i>put</i>.
      */
-    public static Operation put(byte[] value) {
-        return new Operation(new Operation.PutOp(value));
+    public static Operation put(Key key, byte[] value) {
+        return new Operation(new Operation.PutOp(key.bytes(), value));
     }
 
     /**
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index d73d080..5f0f453 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -29,20 +29,18 @@
         <relativePath>../../parent/pom.xml</relativePath>
     </parent>
 
-    <artifactId>metastorage-server</artifactId>
+    <artifactId>ignite-metastorage-server</artifactId>
     <version>3.0.0-SNAPSHOT</version>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
-            <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>metastorage-common</artifactId>
-            <version>${project.version}</version>
+            <artifactId>ignite-metastorage-common</artifactId>
         </dependency>
 
         <dependency>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java
new file mode 100644
index 0000000..55ec9e6
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public abstract class AbstractCondition implements Condition {
+    private final byte[] key;
+
+    public AbstractCondition(byte[] key) {
+        this.key = key;
+    }
+
+    @Override public byte[] key() {
+        return key;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
new file mode 100644
index 0000000..ea4fcc7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public interface Condition {
+    byte[] key();
+
+    boolean test(Entry e);
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 5d6da44..5659d3e 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -59,6 +59,8 @@ public interface KeyValueStorage {
     @NotNull
     Collection<Entry> getAndRemoveAll(List<byte[]> keys);
 
+    boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure);
+
     Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
 
     Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
new file mode 100644
index 0000000..2d1fbfd
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
@@ -0,0 +1,31 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public class Operation {
+    private final byte[] key;
+    private final byte[] val;
+    private final Type type;
+
+    public Operation(Type type, byte[] key, byte[] val) {
+        this.key = key;
+        this.val = val;
+        this.type = type;
+    }
+
+    byte[] key() {
+        return key;
+    }
+
+    byte[] value() {
+        return val;
+    }
+
+    Type type() {
+        return type;
+    }
+
+    enum Type {
+        PUT,
+        REMOVE,
+        NO_OP
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java
new file mode 100644
index 0000000..95a0137
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java
@@ -0,0 +1,75 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public class RevisionCondition extends AbstractCondition {
+    private final Type type;
+
+    private final long rev;
+
+    public RevisionCondition(Type type, byte[] key, long rev) {
+        super(key);
+        this.type = type;
+        this.rev = rev;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Entry e) {
+        int res = Long.compare(e.revision(), rev);
+
+        return type.test(res);
+    }
+
+    /**
+     * Defines possible condition types which can be applied to the revision.
+     */
+    public enum Type {
+        /** Equality condition type. */
+        EQUAL {
+            @Override public boolean test(long res) {
+                return res == 0;
+            }
+        },
+
+        /** Inequality condition type. */
+        NOT_EQUAL {
+            @Override public boolean test(long res) {
+                return res != 0;
+            }
+        },
+
+        /** Greater than condition type. */
+        GREATER {
+            @Override public boolean test(long res) {
+                return res > 0;
+            }
+        },
+
+        /** Less than condition type. */
+        LESS {
+            @Override public boolean test(long res) {
+                return res < 0;
+            }
+        },
+
+        /** Less than or equal to condition type. */
+        LESS_OR_EQUAL {
+            @Override public boolean test(long res) {
+                return res <= 0;
+            }
+        },
+
+        /** Greater than or equal to condition type. */
+        GREATER_OR_EQUAL {
+            @Override public boolean test(long res) {
+                return res >= 0;
+            }
+        };
+
+        /**
+         * Interprets comparison result.
+         *
+         * @param res The result of comparison.
+         * @return The interpretation of the comparison result.
+         */
+        public abstract boolean test(long res);
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index b37c96a..3033f2c 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -29,6 +29,7 @@ import java.util.NoSuchElementException;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.function.Predicate;
+
 import org.apache.ignite.metastorage.common.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
@@ -63,14 +64,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
     @Override public void put(byte[] key, byte[] value) {
         synchronized (mux) {
-            doPut(key, value);
+            long curRev = rev + 1;
+
+            doPut(key, value, curRev);
+
+            rev = curRev;
         }
     }
 
     @NotNull
     @Override public Entry getAndPut(byte[] key, byte[] bytes) {
         synchronized (mux) {
-            long lastRev = doPut(key, bytes);
+            long curRev = rev + 1;
+
+            long lastRev = doPut(key, bytes, curRev);
+
+            rev = curRev;
 
             // Return previous value.
             return doGetValue(key, lastRev);
@@ -129,15 +138,24 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     @Override
     public void remove(byte[] key) {
         synchronized (mux) {
-            Entry e = doGet(key, LATEST_REV, false);
-
-            if (e.empty() || e.tombstone())
-                return;
+            long curRev = rev + 1;
 
-            doPut(key, TOMBSTONE);
+            if (doRemove(key, curRev))
+                rev = curRev;
         }
     }
 
+    private boolean doRemove(byte[] key, long curRev) {
+        Entry e = doGet(key, LATEST_REV, false);
+
+        if (e.empty() || e.tombstone())
+            return false;
+
+        doPut(key, TOMBSTONE, curRev);
+
+        return true;
+    }
+
     @NotNull
     @Override public Entry getAndRemove(byte[] key) {
         synchronized (mux) {
@@ -204,6 +222,47 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return res;
     }
 
+    @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
+        synchronized (mux) {
+            Entry e = get(condition.key());
+
+            boolean branch = condition.test(e);
+
+            Collection<Operation> ops = branch ? success : failure;
+
+            long curRev = rev + 1;
+
+            boolean modified = false;
+
+            for (Operation op : ops) {
+                switch (op.type()) {
+                    case PUT:
+                        doPut(op.key(), op.value(), curRev);
+
+                        modified = true;
+
+                        break;
+
+                    case REMOVE:
+                        modified |= doRemove(op.key(), curRev);
+
+                        break;
+
+                    case NO_OP:
+                        break;
+
+                    default:
+                        throw new IllegalArgumentException("Unknown operation type: " + op.type());
+                }
+            }
+
+            if (modified)
+                rev = curRev;
+
+            return branch;
+        }
+    }
+
     @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
         return new RangeCursor(keyFrom, keyTo, rev);
     }
@@ -365,9 +424,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter());
     }
 
-    private long doPut(byte[] key, byte[] bytes) {
-        long curRev = ++rev;
-
+    private long doPut(byte[] key, byte[] bytes, long curRev) {
         long curUpdCntr = ++updCntr;
 
         // Update keysIdx.
@@ -378,13 +435,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         revs.add(curRev);
 
         // Update revsIdx.
-        NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
+        //NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
 
         Value val = new Value(bytes, curUpdCntr);
 
-        entries.put(key, val);
+        //entries.put(key, val);
 
-        revsIdx.put(curRev, entries);
+        //revsIdx.put(curRev, entries);
+
+        revsIdx.compute(
+                curRev,
+                (rev, entries) -> {
+                    if (entries == null)
+                        entries = new TreeMap<>(CMP);
+
+                    entries.put(key, val);
+
+                    return entries;
+                }
+        );
 
         return lastRev;
     }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
new file mode 100644
index 0000000..dfd9f8e
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
@@ -0,0 +1,49 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.Arrays;
+
+public class ValueCondition extends AbstractCondition {
+    private final Type type;
+
+    private final byte[] val;
+
+    public ValueCondition(Type type, byte[] key, byte[] val) {
+        super(key);
+        this.type = type;
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Entry e) {
+        int res = Arrays.compare(e.value(), val);
+
+        return type.test(res);
+    }
+
+    /**
+     * Defines possible condition types which can be applied to the value.
+     */
+    enum Type {
+        /** Equality condition type. */
+        EQUAL {
+            @Override public boolean test(long res) {
+                return res == 0;
+            }
+        },
+
+        /** Inequality condition type. */
+        NOT_EQUAL {
+            @Override public boolean test(long res) {
+                return res != 0;
+            }
+        };
+
+        /**
+         * Interprets comparison result.
+         *
+         * @param res The result of comparison.
+         * @return The interpretation of the comparison result.
+         */
+        public abstract boolean test(long res);
+    }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java
new file mode 100644
index 0000000..3a08c13
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java
@@ -0,0 +1,65 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RevisionConditionTest {
+    private static final byte[] key = new byte[] {1};
+
+    private static final byte[] val = new byte[] {2};
+
+    @Test
+    public void eq() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.EQUAL, key, 1);
+
+        // 1 == 1.
+        assertTrue(cond.test(new Entry(key, val, 1, 1)));
+    }
+
+    @Test
+    public void ne() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, 1);
+
+        // 2 != 1.
+        assertTrue(cond.test(new Entry(key, val, 2, 1)));
+    }
+
+    @Test
+    public void gt() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER, key, 1);
+
+        // 2 > 1.
+        assertTrue(cond.test(new Entry(key, val, 2, 1)));
+    }
+
+    @Test
+    public void ge() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, 1);
+
+        // 2 >= 1 (2 > 1).
+        assertTrue(cond.test(new Entry(key, val, 2, 1)));
+
+        // 1 >= 1 (1 == 1).
+        assertTrue(cond.test(new Entry(key, val, 1, 1)));
+    }
+
+    @Test
+    public void lt() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS, key, 2);
+
+        // 1 < 2
+        assertTrue(cond.test(new Entry(key, val, 1, 1)));
+    }
+
+    @Test
+    public void le() {
+        RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, 2);
+
+        // 1 <= 2 (1 < 2)
+        assertTrue(cond.test(new Entry(key, val, 1, 1)));
+
+        // 1 <= 1 (1 == 1).
+        assertTrue(cond.test(new Entry(key, val, 1, 1)));
+    }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 27df790..2cda29b 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -939,6 +939,329 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    public void invokeWithRevisionCondition_successBranch() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        boolean branch = storage.invoke(
+                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
+                List.of(
+                        new Operation(Operation.Type.PUT, key1, val1_2),
+                        new Operation(Operation.Type.PUT, key2, val2)
+                ),
+                List.of(new Operation(Operation.Type.PUT, key3, val3))
+        );
+
+        // "Success" branch is applied.
+        assertTrue(branch);
+        assertEquals(2, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Entry e1 = storage.get(key1);
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertEquals(2, e1.revision());
+        assertEquals(2, e1.updateCounter());
+        assertArrayEquals(val1_2, e1.value());
+
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertEquals(2, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertArrayEquals(val2, e2.value());
+
+        // "Failure" branch isn't applied.
+        Entry e3 = storage.get(key3);
+
+        assertTrue(e3.empty());
+    }
+
+    @Test
+    public void invokeWithRevisionCondition_failureBranch() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        boolean branch = storage.invoke(
+                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
+                List.of(new Operation(Operation.Type.PUT, key3, val3)),
+                List.of(
+                        new Operation(Operation.Type.PUT, key1, val1_2),
+                        new Operation(Operation.Type.PUT, key2, val2)
+                )
+        );
+
+        // "Failure" branch is applied.
+        assertFalse(branch);
+        assertEquals(2, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Entry e1 = storage.get(key1);
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertEquals(2, e1.revision());
+        assertEquals(2, e1.updateCounter());
+        assertArrayEquals(val1_2, e1.value());
+
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertEquals(2, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertArrayEquals(val2, e2.value());
+
+        // "Success" branch isn't applied.
+        Entry e3 = storage.get(key3);
+
+        assertTrue(e3.empty());
+    }
+
+    @Test
+    public void invokeWithValueCondition_successBranch() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        boolean branch = storage.invoke(
+                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
+                List.of(
+                        new Operation(Operation.Type.PUT, key1, val1_2),
+                        new Operation(Operation.Type.PUT, key2, val2)
+                ),
+                List.of(new Operation(Operation.Type.PUT, key3, val3))
+        );
+
+        // "Success" branch is applied.
+        assertTrue(branch);
+        assertEquals(2, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Entry e1 = storage.get(key1);
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertEquals(2, e1.revision());
+        assertEquals(2, e1.updateCounter());
+        assertArrayEquals(val1_2, e1.value());
+
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertEquals(2, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertArrayEquals(val2, e2.value());
+
+        // "Failure" branch isn't applied.
+        Entry e3 = storage.get(key3);
+
+        assertTrue(e3.empty());
+    }
+
+    @Test
+    public void invokeWithValueCondition_failureBranch() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        boolean branch = storage.invoke(
+                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
+                List.of(new Operation(Operation.Type.PUT, key3, val3)),
+                List.of(
+                        new Operation(Operation.Type.PUT, key1, val1_2),
+                        new Operation(Operation.Type.PUT, key2, val2)
+                )
+        );
+
+        // "Failure" branch is applied.
+        assertFalse(branch);
+        assertEquals(2, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Entry e1 = storage.get(key1);
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertEquals(2, e1.revision());
+        assertEquals(2, e1.updateCounter());
+        assertArrayEquals(val1_2, e1.value());
+
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertEquals(2, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertArrayEquals(val2, e2.value());
+
+        // "Success" branch isn't applied.
+        Entry e3 = storage.get(key3);
+
+        assertTrue(e3.empty());
+    }
+
+    @Test
+    public void invokeOperations() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
+
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.put(key1, val1);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // No-op.
+        boolean branch = storage.invoke(
+                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+                List.of(new Operation(Operation.Type.NO_OP, null, null)),
+                List.of(new Operation(Operation.Type.NO_OP, null, null))
+        );
+
+        assertTrue(branch);
+
+        // No updates.
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // Put.
+        branch = storage.invoke(
+                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+                List.of(
+                        new Operation(Operation.Type.PUT, key2, val2),
+                        new Operation(Operation.Type.PUT, key3, val3)
+                ),
+                List.of(new Operation(Operation.Type.NO_OP, null, null))
+        );
+
+        assertTrue(branch);
+
+        // +1 for revision, +2 for update counter.
+        assertEquals(2, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertEquals(2, e2.revision());
+        assertEquals(2, e2.updateCounter());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2, e2.value());
+
+        Entry e3 = storage.get(key3);
+
+        assertFalse(e3.empty());
+        assertFalse(e3.tombstone());
+        assertEquals(2, e3.revision());
+        assertEquals(3, e3.updateCounter());
+        assertArrayEquals(key3, e3.key());
+        assertArrayEquals(val3, e3.value());
+
+        // Remove.
+        branch = storage.invoke(
+                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+                List.of(
+                        new Operation(Operation.Type.REMOVE, key2, null),
+                        new Operation(Operation.Type.REMOVE, key3, null)
+                ),
+                List.of(new Operation(Operation.Type.NO_OP, null, null))
+        );
+
+        assertTrue(branch);
+
+        // +1 for revision, +2 for update counter.
+        assertEquals(3, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertTrue(e2.tombstone());
+        assertEquals(3, e2.revision());
+        assertEquals(4, e2.updateCounter());
+        assertArrayEquals(key2, e2.key());
+
+        e3 = storage.get(key3);
+
+        assertFalse(e3.empty());
+        assertTrue(e3.tombstone());
+        assertEquals(3, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertArrayEquals(key3, e3.key());
+    }
+
+    @Test
     public void compact() {
         assertEquals(0, storage.revision());
         assertEquals(0, storage.updateCounter());
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java
new file mode 100644
index 0000000..717da54
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java
@@ -0,0 +1,27 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ValueConditionTest {
+    private static final byte[] key = new byte[] {1};
+
+    private static final byte[] val1 = new byte[] {11};
+
+    private static final byte[] val2 = new byte[] {22};
+
+    @Test
+    public void eq() {
+        ValueCondition cond = new ValueCondition(ValueCondition.Type.EQUAL, key, val1);
+
+        assertTrue(cond.test(new Entry(key, val1, 1, 1)));
+    }
+
+    @Test
+    public void ne() {
+        ValueCondition cond = new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, val1);
+
+        assertTrue(cond.test(new Entry(key, val2, 1, 1)));
+    }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 4e06338..3a62146 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -303,27 +303,25 @@ import org.jetbrains.annotations.Nullable;
     }
 
     /**
-     * @see MetaStorageService#invoke(Key, Condition, Operation, Operation)
+     * @see MetaStorageService#invoke(Condition, Operation, Operation)
      */
     public @NotNull CompletableFuture<Boolean> invoke(
-        @NotNull Key key,
         @NotNull Condition cond,
         @NotNull Operation success,
         @NotNull Operation failure
     ) {
-        return metaStorageSvc.invoke(key, cond, success, failure);
+        return metaStorageSvc.invoke(cond, success, failure);
     }
 
     /**
-     * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation)
+     * @see MetaStorageService#getAndInvoke(Condition, Operation, Operation)
      */
     public @NotNull CompletableFuture<Entry> getAndInvoke(
-        @NotNull Key key,
         @NotNull Condition cond,
         @NotNull Operation success,
         @NotNull Operation failure
     ) {
-        return metaStorageSvc.getAndInvoke(key, cond, success, failure);
+        return metaStorageSvc.getAndInvoke(cond, success, failure);
     }
 
     /**
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 6c6b0f2..6dc1c9b 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -213,70 +213,79 @@ public class IgnitionImpl implements Ignition {
     private static MetaStorageService metaStorageServiceMock() {
         return new MetaStorageService() {
             @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override
             public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
-            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
+            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
                 @NotNull Operation success, @NotNull Operation failure) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
-            @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
-                @NotNull Operation success, @NotNull Operation failure) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+                                                                        @NotNull Collection<Operation> success,
+                                                                        @NotNull Collection<Operation> failure
+            ) {
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
+            }
+
+            @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition,
+                                                                            @NotNull Operation success,
+                                                                            @NotNull Operation failure
+            ) {
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo,
@@ -295,11 +304,11 @@ public class IgnitionImpl implements Ignition {
             }
 
             @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
 
             @Override public @NotNull CompletableFuture<Void> compact() {
-                throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+                throw new UnsupportedOperationException("Meta storage service is not implemented yet");
             }
         };
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 86a04e4..593ec5b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -221,15 +221,15 @@ public class TableManager implements IgniteTables {
 
                 UUID tblId = new UUID(revision, update);
 
+                Key key = new Key(INTERNAL_PREFIX + tblId);
                 CompletableFuture<Boolean> fut = metaStorageMgr.invoke(
-                    new Key(INTERNAL_PREFIX + tblId.toString()),
-                    Conditions.value().eq(null),
-                    Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)),
+                    Conditions.value(key).eq(null),
+                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
                     Operations.noop());
 
                 try {
                     if (fut.get()) {
-                        metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]);
+                        metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId), new byte[0]);
 
                         LOG.info("Table manager created a table [name={}, revision={}]",
                             tableView.name(), revision);

[ignite-3] 02/08: IGNITE-14398: Meta storage: added update counter

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a58026e019b4b6343357855d6b0df5c0b20c7b04
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Mar 31 18:18:09 2021 +0300

    IGNITE-14398: Meta storage: added update counter
---
 .../ignite/internal/metastorage/server/Entry.java  | 37 +++++++++---
 .../metastorage/server/KeyValueStorage.java        |  2 +
 .../server/SimpleInMemoryKeyValueStorage.java      | 70 +++++++++++++---------
 .../ignite/internal/metastorage/server/Value.java  | 27 +++++++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 29 +++++++++
 5 files changed, 130 insertions(+), 35 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 442aef9..263a88b 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -44,20 +44,31 @@ public class Entry {
     final private long rev;
 
     /**
+     * Update counter corresponds to this particular entry.
+     * <p>
+     *     {@code updCntr == 0} for {@link #empty()} entry,
+     *     {@code updCntr > 0} for regular and {@link #tombstone()} entries.
+     * </p>
+     */
+    final private long updCntr;
+
+    /**
      * Constructor.
      *
      * @param key Key bytes. Couldn't be {@code null}.
      * @param val Value bytes. Couldn't be {@code null}.
      * @param rev Revision.
+     * @param updCntr Update counter.
      */
     // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
-    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev, long updCntr) {
         assert key != null : "key can't be null";
         assert val != null : "value can't be null";
 
         this.key = key;
         this.val = val;
         this.rev = rev;
+        this.updCntr = updCntr;
     }
 
     /**
@@ -65,13 +76,15 @@ public class Entry {
      *
      * @param key Key bytes. Couldn't be {@code null}.
      * @param rev Revision.
+     * @param updCntr Update counter.
      */
-    private Entry(@NotNull byte[] key, long rev) {
+    private Entry(@NotNull byte[] key, long rev, long updCntr) {
         assert key != null : "key can't be null";
 
         this.key = key;
         this.val = null;
         this.rev = rev;
+        this.updCntr = updCntr;
     }
 
     /**
@@ -82,20 +95,22 @@ public class Entry {
      */
     @NotNull
     public static Entry empty(byte[] key) {
-        return new Entry(key, 0);
+        return new Entry(key, 0, 0);
     }
 
     /**
      * Creates an instance of tombstone entry for a given key and a revision.
      *
      * @param key Key bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     * @param updCntr Update counter.
      * @return Empty entry.
      */
     @NotNull
-    public static Entry tombstone(byte[] key, long rev) {
+    public static Entry tombstone(byte[] key, long rev, long updCntr) {
         assert rev > 0 : "rev must be positive for tombstone entry.";
 
-        return new Entry(key, rev);
+        return new Entry(key, rev, updCntr);
     }
 
     /**
@@ -127,12 +142,20 @@ public class Entry {
     }
 
     /**
+     * Returns a update counter.
+     * @return Update counter.
+     */
+    public long updateCounter() {
+        return updCntr;
+    }
+
+    /**
      * Returns value which denotes whether entry is tombstone or not.
      *
      * @return {@code True} if entry is tombstone, otherwise - {@code false}.
      */
     public boolean tombstone() {
-        return val == null && rev > 0;
+        return val == null && rev > 0 && updCntr > 0;
     }
 
     /**
@@ -141,6 +164,6 @@ public class Entry {
      * @return {@code True} if entry is empty, otherwise - {@code false}.
      */
     public boolean empty() {
-        return val == null && rev == 0;
+        return val == null && rev == 0 && updCntr == 0;
     }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 1bf6b78..e245e08 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -8,6 +8,8 @@ public interface KeyValueStorage {
 
     long revision();
 
+    long updateCounter();
+
     @NotNull
     Entry put(byte[] key, byte[] value);
 
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 9059aec..b764998 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -12,23 +12,25 @@ import java.util.TreeMap;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+
 /**
  * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
  */
 public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
 
-    private static final byte[] TOMBSTONE = new byte[0];
-
     private static final long LATEST_REV = -1;
 
     private final Watcher watcher;
 
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
-    private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+    private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
 
-    private long grev = 0;
+    private long rev;
+
+    private long updCntr;
 
     private final Object mux = new Object();
 
@@ -37,35 +39,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     }
 
     @Override public long revision() {
-        return grev;
+        return rev;
+    }
+
+    @Override public long updateCounter() {
+        return updCntr;
     }
 
     @NotNull
-    @Override public Entry put(byte[] key, byte[] val) {
+    @Override public Entry put(byte[] key, byte[] bytes) {
         synchronized (mux) {
-            long crev = ++grev;
+            long curRev = ++rev;
+
+            long curUpdCntr = ++updCntr;
 
             // Update keysIdx.
             List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
 
-            long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+            long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
 
-            revs.add(crev);
+            revs.add(curRev);
 
             // Update revsIdx.
-            NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            Value val = new Value(bytes, curUpdCntr);
 
             entries.put(key, val);
 
-            revsIdx.put(crev, entries);
+            revsIdx.put(curRev, entries);
 
             // Return previous value.
-            if (lrev == 0)
+            if (lastRev == 0)
                 return Entry.empty(key);
 
-            NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+            NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
+
+            Value lastVal = lastRevVals.get(key);
 
-            Entry res = new Entry(key, lastVal.get(key), lrev);
+            Entry res = new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
 
             //TODO: notify watchers
 
@@ -158,16 +170,18 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
                             //return new AbstractMap.SimpleImmutableEntry<>(key, null);
                         }
 
-                        NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+                        NavigableMap<byte[], Value> vals = revsIdx.get(rev);
 
                         if (vals == null || vals.isEmpty()) {
                             throw new IllegalStateException("vals == null || vals.isEmpty()");
                             //return new AbstractMap.SimpleImmutableEntry<>(key, null);
                         }
 
-                        byte[] val = vals.get(key);
+                        Value val = vals.get(key);
 
-                        return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+                        return val.tombstone() ?
+                                Entry.tombstone(key, rev, val.updateCounter()) :
+                                new Entry(key, val.bytes(), rev, val.updateCounter());
                     }
                 }
             };
@@ -178,7 +192,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         synchronized (mux) {
             NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
-            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+            NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
 
             keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
 
@@ -192,18 +206,18 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
             byte[] key,
             List<Long> revs,
             NavigableMap<byte[], List<Long>> compactedKeysIdx,
-            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+            NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx
     ) {
         Long lrev = lastRevision(revs);
 
-        NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+        NavigableMap<byte[], Value> kv = revsIdx.get(lrev);
 
-        byte[] lastVal = kv.get(key);
+        Value lastVal = kv.get(key);
 
-        if (lastVal != TOMBSTONE) {
+        if (!lastVal.tombstone()) {
             compactedKeysIdx.put(key, listOf(lrev));
 
-            NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+            NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
                     lrev,
                     k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
             );
@@ -227,17 +241,17 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
         long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
 
-        NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+        NavigableMap<byte[], Value> entries = revsIdx.get(lrev);
 
         if (entries == null || entries.isEmpty())
             return Entry.empty(key);
 
-        byte[] val = entries.get(key);
+        Value val = entries.get(key);
 
-        if (val == TOMBSTONE)
-            return Entry.tombstone(key, lrev);
+        if (val.tombstone())
+            return Entry.tombstone(key, lrev, val.updateCounter());
 
-        return new Entry(key, val , lrev);
+        return new Entry(key, val.bytes() , lrev, val.updateCounter());
     }
 
     private static boolean isPrefix(byte[] pref, byte[] term) {
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
new file mode 100644
index 0000000..250a5ea
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -0,0 +1,27 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public class Value {
+    public static final byte[] TOMBSTONE = new byte[0];
+
+    private final byte[] bytes;
+    private final long updCntr;
+
+    public Value(@NotNull byte[] bytes, long updCntr) {
+        this.bytes = bytes;
+        this.updCntr = updCntr;
+    }
+
+    public byte[] bytes() {
+        return bytes;
+    }
+
+    public long updateCounter() {
+        return updCntr;
+    }
+
+    boolean tombstone() {
+        return bytes == TOMBSTONE;
+    }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index f7fb17e..eae76fd 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -31,11 +31,13 @@ class SimpleInMemoryKeyValueStorageTest {
         byte[] val2_2 = kv(2, 2);
 
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Previous entry is empty.
         Entry emptyEntry = storage.put(key1, val1_1);
 
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
         assertTrue(emptyEntry.empty());
 
         // Entry with rev == 1.
@@ -46,12 +48,15 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_1.key());
         assertArrayEquals(val1_1, e1_1.value());
         assertEquals(1, e1_1.revision());
+        assertEquals(1, e1_1.updateCounter());
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
 
         // Previous entry is empty.
         emptyEntry = storage.put(key2, val2_2);
 
         assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
         assertTrue(emptyEntry.empty());
 
         // Entry with rev == 2.
@@ -62,7 +67,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key2, e2.key());
         assertArrayEquals(val2_2, e2.value());
         assertEquals(2, e2.revision());
+        assertEquals(2, e2.updateCounter());
         assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
 
         // Previous entry is not empty.
         e1_1 = storage.put(key1, val1_3);
@@ -72,7 +79,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_1.key());
         assertArrayEquals(val1_1, e1_1.value());
         assertEquals(1, e1_1.revision());
+        assertEquals(1, e1_1.updateCounter());
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         // Entry with rev == 3.
         Entry e1_3 = storage.get(key1);
@@ -82,7 +91,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_3.key());
         assertArrayEquals(val1_3, e1_3.value());
         assertEquals(3, e1_3.revision());
+        assertEquals(3, e1_3.updateCounter());
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         // Remove existing entry.
         Entry e2_2 = storage.remove(key2);
@@ -92,7 +103,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key2, e2_2.key());
         assertArrayEquals(val2_2, e2_2.value());
         assertEquals(2, e2_2.revision());
+        assertEquals(2, e2_2.updateCounter());
         assertEquals(4, storage.revision()); // Storage revision is changed.
+        assertEquals(4, storage.updateCounter());
 
         // Remove already removed entry.
         Entry tombstoneEntry = storage.remove(key2);
@@ -100,11 +113,13 @@ class SimpleInMemoryKeyValueStorageTest {
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
         assertEquals(4, storage.revision()); // Storage revision is not changed.
+        assertEquals(4, storage.updateCounter());
 
         // Compact and check that tombstones are removed.
         storage.compact();
 
         assertEquals(4, storage.revision());
+        assertEquals(4, storage.updateCounter());
         assertTrue(storage.remove(key2).empty());
         assertTrue(storage.get(key2).empty());
 
@@ -116,7 +131,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_3.key());
         assertArrayEquals(val1_3, e1_3.value());
         assertEquals(3, e1_3.revision());
+        assertEquals(3, e1_3.updateCounter());
         assertEquals(5, storage.revision()); // Storage revision is changed.
+        assertEquals(5, storage.updateCounter());
 
         // Remove already removed entry.
         tombstoneEntry = storage.remove(key1);
@@ -124,11 +141,13 @@ class SimpleInMemoryKeyValueStorageTest {
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
         assertEquals(5, storage.revision()); // // Storage revision is not changed.
+        assertEquals(5, storage.updateCounter());
 
         // Compact and check that tombstones are removed.
         storage.compact();
 
         assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
         assertTrue(storage.remove(key1).empty());
         assertTrue(storage.get(key1).empty());
     }
@@ -136,33 +155,40 @@ class SimpleInMemoryKeyValueStorageTest {
     @Test
     void compact() {
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Compact empty.
         storage.compact();
 
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Compact non-empty.
         fill(storage, 1, 1);
 
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
 
         fill(storage, 2, 2);
 
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         fill(storage, 3, 3);
 
         assertEquals(6, storage.revision());
+        assertEquals(6, storage.updateCounter());
 
         storage.remove(k(3));
 
         assertEquals(7, storage.revision());
+        assertEquals(7, storage.updateCounter());
         assertTrue(storage.get(k(3)).tombstone());
 
         storage.compact();
 
         assertEquals(7, storage.revision());
+        assertEquals(7, storage.updateCounter());
 
         Entry e1 = storage.get(k(1));
 
@@ -171,6 +197,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(k(1), e1.key());
         assertArrayEquals(kv(1,1), e1.value());
         assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
 
         Entry e2 = storage.get(k(2));
 
@@ -180,6 +207,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(kv(2,2), e2.value());
         assertTrue(storage.get(k(2), 2).empty());
         assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
 
         Entry e3 = storage.get(k(3));
 
@@ -200,6 +228,7 @@ class SimpleInMemoryKeyValueStorageTest {
         fill("zoo", storage, expZooMap);
 
         assertEquals(300, storage.revision());
+        assertEquals(300, storage.updateCounter());
 
         assertIterate("key", storage, expKeyMap);
         assertIterate("zoo", storage, expZooMap);

[ignite-3] 06/08: IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ee2c7adf51ff11148e9c359893217c86a866a9cc
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Fri Apr 23 02:35:28 2021 +0300

    IGNITE-14389 Added putAll and removeAll. Started cursor management: ranges and watches (WIP)
---
 .../metastorage/server/KeyValueStorage.java        |  10 +
 .../server/SimpleInMemoryKeyValueStorage.java      | 186 +++++++++-
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 380 +++++++++++++++++++++
 3 files changed, 565 insertions(+), 11 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0f18ece..526e4fb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -31,11 +31,21 @@ public interface KeyValueStorage {
 
     void putAll(List<byte[]> keys, List<byte[]> values);
 
+    @NotNull
+    Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
+
     void remove(byte[] key);
 
     @NotNull
     Entry getAndRemove(byte[] key);
 
+    void removeAll(List<byte[]> key);
+
+    @NotNull
+    Collection<Entry> getAndRemoveAll(List<byte[]> keys);
+
+    Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+
     Iterator<Entry> iterate(byte[] key);
 
     //Iterator<Entry> iterate(long rev);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index f532005..32f720e 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,15 +1,9 @@
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
+import java.util.*;
+import java.util.function.Consumer;
+
+import org.apache.ignite.metastorage.common.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
@@ -25,6 +19,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
     private final Watcher watcher;
 
+    private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
+
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
@@ -66,8 +62,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     @Override
     public void putAll(List<byte[]> keys, List<byte[]> values) {
         synchronized (mux) {
+            long curRev = rev + 1;
+
+            doPutAll(curRev, keys, values);
+        }
+    }
+
+    @Override
+    public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+        Collection<Entry> res;
+
+        synchronized (mux) {
+            long curRev = rev + 1;
 
+            res = doGetAll(keys, curRev);
+
+            doPutAll(curRev, keys, values);
         }
+
+        return res;
     }
 
     @NotNull
@@ -119,6 +132,69 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    @Override
+    public void removeAll(List<byte[]> keys) {
+        synchronized (mux) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            List<byte[]> vals = new ArrayList<>(keys.size());
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                Entry e = doGet(key, LATEST_REV, false);
+
+                if (e.empty() || e.tombstone())
+                    continue;
+
+                existingKeys.add(key);
+
+                vals.add(TOMBSTONE);
+            }
+
+            doPutAll(curRev, existingKeys, vals);
+        }
+    }
+
+    @Override
+    public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+        Collection<Entry> res = new ArrayList<>(keys.size());
+
+        synchronized (mux) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            List<byte[]> vals = new ArrayList<>(keys.size());
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                Entry e = doGet(key, LATEST_REV, false);
+
+                res.add(e);
+
+                if (e.empty() || e.tombstone())
+                    continue;
+
+                existingKeys.add(key);
+
+                vals.add(TOMBSTONE);
+            }
+
+            doPutAll(curRev, existingKeys, vals);
+        }
+
+        return res;
+    }
+
+    @Override
+    public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
+        return null;
+    }
+
     @Override public Iterator<Entry> iterate(byte[] keyFrom) {
         synchronized (mux) {
             NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
@@ -237,7 +313,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private Collection<Entry> doGetAll(List<byte[]> keys, long rev) {
         assert keys != null : "keys list can't be null.";
         assert !keys.isEmpty() : "keys list can't be empty.";
-        assert rev > 0 : "Revision must be positive.";
+        assert rev > 0 || rev == LATEST_REV: "Revision must be positive.";
 
         Collection<Entry> res = new ArrayList<>(keys.size());
 
@@ -344,6 +420,39 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return lastRev;
     }
 
+    private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
+        synchronized (mux) {
+            // Update revsIdx.
+            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+
+                byte[] bytes = bytesList.get(i);
+
+                long curUpdCntr = ++updCntr;
+
+                // Update keysIdx.
+                List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+                long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+                revs.add(curRev);
+
+                Value val = new Value(bytes, curUpdCntr);
+
+                entries.put(key, val);
+
+                revsIdx.put(curRev, entries);
+            }
+
+            rev = curRev;
+
+            return curRev;
+        }
+    }
+
+
     private static boolean isPrefix(byte[] pref, byte[] term) {
         if (pref.length > term.length)
             return false;
@@ -368,4 +477,59 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return res;
     }
 
+    private class RangeCursor implements Cursor<Entry> {
+        private final byte[] keyFrom;
+        private final byte[] keyTo;
+        private final long rev;
+        private byte[] curKey;
+
+        public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            this.rev = rev;
+        }
+
+        @Override public void close() throws Exception {
+
+        }
+
+        @NotNull
+        @Override public Iterator<Entry> iterator() {
+            return new Iterator<Entry>() {
+                @Override public boolean hasNext() {
+                    synchronized (mux) {
+                        byte[] key = keysIdx.ceilingKey(curKey);
+
+                        return key != null;
+                    }
+                }
+
+                @Override public Entry next() {
+                    synchronized (mux) {
+                        Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
+
+                        if (e == null)
+                            throw new NoSuchElementException();
+
+                        List<Long> revs = e.getValue();
+
+                        assert revs != null && !revs.isEmpty() :
+                                "Revisions should not be empty: [revs=" + revs + ']';
+
+                        //lastRevision(re)
+
+                        return null;
+                    }
+                }
+            };
+        }
+
+        @Override public void forEach(Consumer<? super Entry> action) {
+            Cursor.super.forEach(action);
+        }
+
+        @Override public Spliterator<Entry> spliterator() {
+            return Cursor.super.spliterator();
+        }
+    }
 }
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index fa130e6..4a73137 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -272,6 +272,191 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    public void putAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Must be rewritten.
+        storage.put(key2, val2_1);
+
+        // Remove. Tombstone must be replaced by new value.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+        assertEquals(4, storage.revision());
+        assertEquals(6, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(4, e1.revision());
+        assertEquals(4, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(4, e2.revision());
+        assertEquals(5, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(4, e3.revision());
+        assertEquals(6, e3.updateCounter());
+        assertFalse(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
+    public void getAndPutAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Must be rewritten.
+        storage.put(key2, val2_1);
+
+        // Remove. Tombstone must be replaced by new value.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
+
+        assertEquals(4, storage.revision());
+        assertEquals(6, storage.updateCounter());
+
+        assertEquals(3, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(0, e1.revision());
+        assertEquals(0, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertTrue(e1.empty());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(1, e2.revision());
+        assertEquals(1, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_1, e2.value());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(3, e3.revision());
+        assertEquals(3, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test state after putAll.
+        entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(4, e1.revision());
+        assertEquals(4, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+        assertArrayEquals(val1, e1.value());
+
+        // Test rewritten value.
+        e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(4, e2.revision());
+        assertEquals(5, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+        assertArrayEquals(val2_2, e2.value());
+
+        // Test removed value.
+        e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(4, e3.revision());
+        assertEquals(6, e3.updateCounter());
+        assertFalse(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
     public void remove() {
         byte[] key = k(1);
         byte[] val = kv(1, 1);
@@ -377,6 +562,201 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
+    public void removeAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove. Tombstone must not be removed again.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        storage.removeAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(6, storage.revision());
+        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(6, e1.revision());
+        assertEquals(6, e1.updateCounter());
+        assertTrue(e1.tombstone());
+        assertFalse(e1.empty());
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(6, e2.revision());
+        assertEquals(7, e2.updateCounter());
+        assertTrue(e2.tombstone());
+        assertFalse(e2.empty());
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
+    public void getAndRemoveAll() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+
+        byte[] key4 = k(4);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        // Regular put.
+        storage.put(key1, val1);
+
+        // Rewrite.
+        storage.put(key2, val2_1);
+        storage.put(key2, val2_2);
+
+        // Remove. Tombstone must not be removed again.
+        storage.put(key3, val3_1);
+        storage.remove(key3);
+
+        assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
+
+        Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(6, storage.revision());
+        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
+
+        assertEquals(4, entries.size());
+
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        Entry e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+        assertFalse(e1.tombstone());
+        assertFalse(e1.empty());
+
+
+        // Test rewritten value.
+        Entry e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
+        assertFalse(e2.tombstone());
+        assertFalse(e2.empty());
+
+
+        // Test removed value.
+        Entry e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        Entry e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+
+        // Test state after getAndRemoveAll.
+        entries = storage.getAll(List.of(key1, key2, key3, key4));
+
+        assertEquals(4, entries.size());
+
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+
+        // Test regular put value.
+        e1 = map.get(new Key(key1));
+
+        assertNotNull(e1);
+        assertEquals(6, e1.revision());
+        assertEquals(6, e1.updateCounter());
+        assertTrue(e1.tombstone());
+        assertFalse(e1.empty());
+
+        // Test rewritten value.
+        e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+        assertEquals(6, e2.revision());
+        assertEquals(7, e2.updateCounter());
+        assertTrue(e2.tombstone());
+        assertFalse(e2.empty());
+
+        // Test removed value.
+        e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+        assertEquals(5, e3.revision());
+        assertEquals(5, e3.updateCounter());
+        assertTrue(e3.tombstone());
+        assertFalse(e3.empty());
+
+        // Test empty value.
+        e4 = map.get(new Key(key4));
+
+        assertNotNull(e4);
+        assertFalse(e4.tombstone());
+        assertTrue(e4.empty());
+    }
+
+    @Test
     public void getAfterRemove() {
         byte[] key = k(1);
         byte[] val = kv(1, 1);