You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/04/28 23:44:50 UTC

[ignite-3] 07/07: IGNITE-14389 Implemented cursor for ranges and watches.

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit bab8272cb4d361cc9e925861529edc7a06d2d761
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Thu Apr 29 02:44:00 2021 +0300

    IGNITE-14389 Implemented cursor for ranges and watches.
---
 .../ignite/internal/metastorage/server/Entry.java  |  17 +
 .../internal/metastorage/server/EntryEvent.java    |  58 +++
 .../metastorage/server/KeyValueStorage.java        |  33 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 333 +++++++++------
 .../ignite/internal/metastorage/server/Value.java  |  17 +
 .../ignite/internal/metastorage/server/Watch.java  |  45 ---
 .../internal/metastorage/server/WatchEvent.java    |  54 +++
 .../internal/metastorage/server/Watcher.java       |  13 -
 .../internal/metastorage/server/WatcherImpl.java   |  58 ---
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 446 ++++++++++++++++++---
 10 files changed, 764 insertions(+), 310 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 263a88b..87b5471 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
new file mode 100644
index 0000000..554a3a7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+/**
+ * Represent an change event for particular key and entry.
+ */
+public class EntryEvent {
+    /** Old (previous) entry. */
+    private final Entry oldEntry;
+
+    /** New (current) entry. */
+    private final Entry entry;
+
+    /**
+     * Constructs event with given old and new entries.
+     *
+     * @param oldEntry Old entry.
+     * @param curEntry New entry.
+     */
+    EntryEvent(Entry oldEntry, Entry curEntry) {
+        this.oldEntry = oldEntry;
+        this.entry = curEntry;
+    }
+
+    /**
+     * Returns old entry.
+     *
+     * @return Old entry.
+     */
+    public Entry oldEntry() {
+        return oldEntry;
+    }
+
+    /**
+     * Rreturns new entry.
+     *
+     * @return New entry.
+     */
+    public Entry entry() {
+        return entry;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 526e4fb..5d6da44 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -1,13 +1,28 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.metastorage.server;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.jetbrains.annotations.NotNull;
 
 public interface KeyValueStorage {
-
     long revision();
 
     long updateCounter();
@@ -44,11 +59,15 @@ public interface KeyValueStorage {
     @NotNull
     Collection<Entry> getAndRemoveAll(List<byte[]> keys);
 
-    Iterator<Entry> range(byte[] keyFrom, byte[] keyTo);
+    Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
+
+    Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
+
+    Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
 
-    Iterator<Entry> iterate(byte[] key);
+    Cursor<WatchEvent> watch(byte[] key, long rev);
 
-    //Iterator<Entry> iterate(long rev);
+    Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
 
     void compact();
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 32f720e..b37c96a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -1,8 +1,34 @@
-package org.apache.ignite.internal.metastorage.server;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import java.util.*;
-import java.util.function.Consumer;
+package org.apache.ignite.internal.metastorage.server;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Predicate;
 import org.apache.ignite.metastorage.common.Cursor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
@@ -13,15 +39,11 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
  * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
  */
 public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
-    private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+    private static final Comparator<byte[]> CMP = Arrays::compare;
 
     private static final long LATEST_REV = -1;
 
-    private final Watcher watcher;
-
-    private final List<Cursor<Entry>> rangeCursors = new ArrayList<>();
-
-    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
 
     private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
 
@@ -31,10 +53,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
     private final Object mux = new Object();
 
-    public SimpleInMemoryKeyValueStorage(Watcher watcher) {
-        this.watcher = watcher;
-    }
-
     @Override public long revision() {
         return rev;
     }
@@ -141,9 +159,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             List<byte[]> vals = new ArrayList<>(keys.size());
 
-            for (int i = 0; i < keys.size(); i++) {
-                byte[] key = keys.get(i);
-
+            for (byte[] key : keys) {
                 Entry e = doGet(key, LATEST_REV, false);
 
                 if (e.empty() || e.tombstone())
@@ -169,9 +185,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             List<byte[]> vals = new ArrayList<>(keys.size());
 
-            for (int i = 0; i < keys.size(); i++) {
-                byte[] key = keys.get(i);
-
+            for (byte[] key : keys) {
                 Entry e = doGet(key, LATEST_REV, false);
 
                 res.add(e);
@@ -190,90 +204,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return res;
     }
 
-    @Override
-    public Iterator<Entry> range(byte[] keyFrom, byte[] keyTo) {
-        return null;
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+        return new RangeCursor(keyFrom, keyTo, rev);
     }
 
-    @Override public Iterator<Entry> iterate(byte[] keyFrom) {
-        synchronized (mux) {
-            NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
-
-            final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
-
-            return new Iterator<>() {
-                private Map.Entry<byte[], List<Long>> curr;
-                private boolean hasNext;
-
-                private void advance() {
-                    if (it.hasNext()) {
-                        Map.Entry<byte[], List<Long>> e = it.next();
-
-                        byte[] key = e.getKey();
-
-                        if (!isPrefix(keyFrom, key))
-                            hasNext = false;
-                        else {
-                            curr = e;
-
-                            hasNext = true;
-                        }
-                    } else
-                        hasNext = false;
-                }
-
-                @Override
-                public boolean hasNext() {
-                    synchronized (mux) {
-                        if (curr == null)
-                            advance();
-
-                        return hasNext;
-                    }
-                }
-
-                @Override
-                public Entry next() {
-                    synchronized (mux) {
-                        if (!hasNext())
-                            throw new NoSuchElementException();
-
-                        Map.Entry<byte[], List<Long>> e = curr;
-
-                        curr = null;
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+        return new RangeCursor(keyFrom, keyTo, revUpperBound);
+    }
 
-                        byte[] key = e.getKey();
+    @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+        assert keyFrom != null;
+        assert rev > 0;
 
-                        List<Long> revs = e.getValue();
+        return new WatchCursor(rev, k ->
+            CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
+        );
+    }
 
-                        long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+    @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+        assert key != null;
+        assert rev > 0;
 
-                        if (rev == 0) {
-                            throw new IllegalStateException("rev == 0");
-                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
-                        }
+        return new WatchCursor(rev, k -> CMP.compare(k, key) == 0);
+    }
 
-                        NavigableMap<byte[], Value> vals = revsIdx.get(rev);
+    @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
+        assert keys != null && !keys.isEmpty();
+        assert rev > 0;
 
-                        if (vals == null || vals.isEmpty()) {
-                            throw new IllegalStateException("vals == null || vals.isEmpty()");
-                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
-                        }
+        TreeSet<byte[]> keySet = new TreeSet<>(CMP);
 
-                        Value val = vals.get(key);
+        keySet.addAll(keys);
 
-                        return val.tombstone() ?
-                                Entry.tombstone(key, rev, val.updateCounter()) :
-                                new Entry(key, val.bytes(), rev, val.updateCounter());
-                    }
-                }
-            };
-        }
+        return new WatchCursor(rev, keySet::contains);
     }
 
+
     @Override public void compact() {
         synchronized (mux) {
-            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
 
             NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
 
@@ -302,7 +271,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
             NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
                     lastRev,
-                    k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+                    k -> new TreeMap<>(CMP)
             );
 
             compactedKv.put(key, lastVal);
@@ -409,7 +378,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         revs.add(curRev);
 
         // Update revsIdx.
-        NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+        NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
 
         Value val = new Value(bytes, curUpdCntr);
 
@@ -423,7 +392,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
         synchronized (mux) {
             // Update revsIdx.
-            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
 
             for (int i = 0; i < keys.size(); i++) {
                 byte[] key = keys.get(i);
@@ -452,19 +421,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
-
-    private static boolean isPrefix(byte[] pref, byte[] term) {
-        if (pref.length > term.length)
-            return false;
-
-        for (int i = 0; i < pref.length - 1; i++) {
-            if (pref[i] != term[i])
-                return false;
-        }
-
-        return true;
-    }
-
     private static long lastRevision(List<Long> revs) {
         return revs.get(revs.size() - 1);
     }
@@ -481,55 +437,184 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         private final byte[] keyFrom;
         private final byte[] keyTo;
         private final long rev;
-        private byte[] curKey;
+        private Entry nextRetEntry;
+        private byte[] lastRetKey;
+        private boolean finished;
 
-        public RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
+        RangeCursor(byte[] keyFrom, byte[] keyTo, long rev) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             this.rev = rev;
         }
 
         @Override public void close() throws Exception {
-
+            // TODO: implement.
         }
 
         @NotNull
         @Override public Iterator<Entry> iterator() {
-            return new Iterator<Entry>() {
+            return new Iterator<>() {
                 @Override public boolean hasNext() {
                     synchronized (mux) {
-                        byte[] key = keysIdx.ceilingKey(curKey);
+                        while (true) {
+                            if (finished)
+                                return false;
+
+                            if (nextRetEntry != null)
+                                return true;
 
-                        return key != null;
+                            byte[] key = lastRetKey;
+
+                            while (!finished || nextRetEntry == null) {
+                                Map.Entry<byte[], List<Long>> e =
+                                        key == null ? keysIdx.ceilingEntry(keyFrom) : keysIdx.higherEntry(key);
+
+                                if (e == null) {
+                                    finished = true;
+
+                                    break;
+                                }
+
+                                key = e.getKey();
+
+                                if (keyTo != null && CMP.compare(key, keyTo) >= 0) {
+                                    finished = true;
+
+                                    break;
+                                }
+
+                                List<Long> revs = e.getValue();
+
+                                assert revs != null && !revs.isEmpty() :
+                                        "Revisions should not be empty: [revs=" + revs + ']';
+
+                                long lastRev = maxRevision(revs, rev);
+
+                                if (lastRev == -1)
+                                    continue;
+
+                                Entry entry = doGetValue(key, lastRev);
+
+                                assert !entry.empty() : "Iterator should not return empty entry.";
+
+                                nextRetEntry = entry;
+
+                                break;
+                            }
+                        }
                     }
                 }
 
                 @Override public Entry next() {
                     synchronized (mux) {
-                        Map.Entry<byte[], List<Long>> e = keysIdx.ceilingEntry(curKey);
-
-                        if (e == null)
-                            throw new NoSuchElementException();
+                        while (true) {
+                            if (finished)
+                                throw new NoSuchElementException();
 
-                        List<Long> revs = e.getValue();
+                            if (nextRetEntry != null) {
+                                Entry e = nextRetEntry;
 
-                        assert revs != null && !revs.isEmpty() :
-                                "Revisions should not be empty: [revs=" + revs + ']';
+                                nextRetEntry = null;
 
-                        //lastRevision(re)
+                                lastRetKey = e.key();
 
-                        return null;
+                                return e;
+                            } else
+                                hasNext();
+                        }
                     }
                 }
             };
         }
+    }
 
-        @Override public void forEach(Consumer<? super Entry> action) {
-            Cursor.super.forEach(action);
+    private class WatchCursor implements Cursor<WatchEvent> {
+        private final Predicate<byte[]> p;
+        private long lastRetRev;
+        private long nextRetRev = -1;
+
+        WatchCursor(long rev, Predicate<byte[]> p) {
+            this.p = p;
+            this.lastRetRev = rev - 1;
         }
 
-        @Override public Spliterator<Entry> spliterator() {
-            return Cursor.super.spliterator();
+        @Override public void close() throws Exception {
+            // TODO: implement
+        }
+
+        @NotNull
+        @Override public Iterator<WatchEvent> iterator() {
+            return new Iterator<>() {
+                @Override public boolean hasNext() {
+                    synchronized (mux) {
+                        if (nextRetRev != -1)
+                            return true;
+
+                        while (true) {
+                            long curRev = lastRetRev + 1;
+
+                            NavigableMap<byte[], Value> entries = revsIdx.get(curRev);
+
+                            if (entries == null)
+                                return false;
+
+                            for (byte[] key : entries.keySet()) {
+                                if (p.test(key)) {
+                                    nextRetRev = curRev;
+
+                                    return true;
+                                }
+                            }
+
+                            lastRetRev++;
+                        }
+                    }
+                }
+
+                @Override public WatchEvent next() {
+                    synchronized (mux) {
+                        while (true) {
+                            if (nextRetRev != -1) {
+                                NavigableMap<byte[], Value> entries = revsIdx.get(nextRetRev);
+
+                                if (entries == null)
+                                    return null;
+
+                                List<EntryEvent> evts = new ArrayList<>(entries.size());
+
+                                for (Map.Entry<byte[], Value> e : entries.entrySet()) {
+                                    byte[] key = e.getKey();
+
+                                    Value val = e.getValue();
+
+                                    if (p.test(key)) {
+                                        Entry newEntry;
+
+                                        if (val.tombstone())
+                                            newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
+                                        else
+                                            newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
+
+                                        Entry oldEntry = doGet(key, nextRetRev - 1, false);
+
+                                        evts.add(new EntryEvent(oldEntry, newEntry));
+                                    }
+                                }
+
+                                if (evts.isEmpty())
+                                    continue;
+
+                                lastRetRev = nextRetRev;
+
+                                nextRetRev = -1;
+
+                                return new WatchEvent(evts);
+                            } else if (!hasNext())
+                                return null;
+                        }
+                    }
+                }
+            };
         }
     }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
index 250a5ea..a438fd4 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import org.jetbrains.annotations.NotNull;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
deleted file mode 100644
index 26cfa5c..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Arrays;
-import java.util.Comparator;
-
-public class Watch {
-    private static final Comparator<byte[]> CMP = Arrays::compare;
-
-    private static final long ANY_REVISION = -1;
-
-    @Nullable
-    private byte[] startKey;
-
-    @Nullable
-    private byte[] endKey;
-
-    long rev = ANY_REVISION;
-
-    public void startKey(byte[] startKey) {
-        this.startKey = startKey;
-    }
-
-    public void endKey(byte[] endKey) {
-        this.endKey = endKey;
-    }
-
-    public void revision(long rev) {
-        this.rev = rev;
-    }
-
-    public void notify(Entry e) {
-        if (startKey != null && CMP.compare(e.key(), startKey) < 0)
-            return;
-
-        if (endKey != null && CMP.compare(e.key(), endKey) > 0)
-            return;
-
-        if (rev != ANY_REVISION && e.revision() <= rev)
-            return;
-
-        System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
-    }
-}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
new file mode 100644
index 0000000..561f203
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.Collection;
+import java.util.List;
+
+public class WatchEvent {
+    private final List<EntryEvent> entryEvts;
+
+    private final boolean batch;
+
+    /**
+     * Constructs an watch event with given entry events collection.
+     *
+     * @param entryEvts Events for entries corresponding to an update under one revision.
+     */
+    public WatchEvent(List<EntryEvent> entryEvts) {
+        assert entryEvts != null && !entryEvts.isEmpty();
+
+        this.batch = entryEvts.size() > 1;
+        this.entryEvts = entryEvts;
+    }
+
+    public boolean batch() {
+        return batch;
+    }
+
+    public Collection<EntryEvent> entryEvents() {
+        return entryEvts;
+    }
+
+    public EntryEvent entryEvent() {
+        if (batch)
+            throw new IllegalStateException("Watch event represents a batch of events.");
+
+        return entryEvts.get(0);
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
deleted file mode 100644
index 5516d06..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-public interface Watcher {
-    void register(@NotNull Watch watch);
-
-    void notify(@NotNull Entry e);
-
-    //TODO: implement
-    void cancel(@NotNull Watch watch);
-}
-
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
deleted file mode 100644
index dc126a0..0000000
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package org.apache.ignite.internal.metastorage.server;
-
-import org.jetbrains.annotations.NotNull;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-public class WatcherImpl implements Watcher {
-    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
-
-    private final List<Watch> watches = new ArrayList<>();
-
-    private volatile boolean stop;
-
-    private final Object mux = new Object();
-
-    @Override public void register(@NotNull Watch watch) {
-        synchronized (mux) {
-            watches.add(watch);
-        }
-    }
-
-    @Override public void notify(@NotNull Entry e) {
-        queue.offer(e);
-    }
-
-    @Override
-    public void cancel(@NotNull Watch watch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    public void shutdown() {
-        stop = true;
-    }
-
-    private class WatcherWorker implements Runnable {
-        @Override public void run() {
-            while (!stop) {
-                try {
-                    Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
-
-                    if (e != null) {
-                        synchronized (mux) {
-                            watches.forEach(w -> w.notify(e));
-                        }
-                    }
-                }
-                catch (InterruptedException interruptedException) {
-                    // No-op.
-                }
-            }
-        }
-    }
-}
-
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 4a73137..27df790 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -1,17 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.ignite.internal.metastorage.server;
 
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Function;
+import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
+import org.apache.ignite.metastorage.common.Cursor;
 import org.apache.ignite.metastorage.common.Key;
-import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static java.util.function.Function.identity;
 import static org.junit.jupiter.api.Assertions.*;
 
 class SimpleInMemoryKeyValueStorageTest {
@@ -19,7 +36,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
     @BeforeEach
     public void setUp() {
-        storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+        storage = new SimpleInMemoryKeyValueStorage();
     }
 
     @Test
@@ -91,7 +108,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -166,7 +183,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -204,7 +221,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -308,7 +325,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -382,7 +399,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(3, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -417,7 +434,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -601,7 +618,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -676,7 +693,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        Map<Key, Entry> map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         Entry e1 = map.get(new Key(key1));
@@ -719,7 +736,7 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, entries.size());
 
-        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), Function.identity()));
+        map =  entries.stream().collect(Collectors.toMap(e -> new Key(e.key()), identity()));
 
         // Test regular put value.
         e1 = map.get(new Key(key1));
@@ -987,60 +1004,377 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    public void iterate() {
-        TreeMap<String, String> expFooMap = new TreeMap<>();
-        TreeMap<String, String> expKeyMap = new TreeMap<>();
-        TreeMap<String, String> expZooMap = new TreeMap<>();
+    public void rangeCursor() {
+        byte[] key1 = k(1);
+        byte[] val1 = kv(1, 1);
 
-        fill("foo", storage, expFooMap);
-        fill("key", storage, expKeyMap);
-        fill("zoo", storage, expZooMap);
+        byte[] key2 = k(2);
+        byte[] val2 = kv(2, 2);
 
-        assertEquals(300, storage.revision());
-        assertEquals(300, storage.updateCounter());
+        byte[] key3 = k(3);
+        byte[] val3 = kv(3, 3);
 
-        assertIterate("key", storage, expKeyMap);
-        assertIterate("zoo", storage, expZooMap);
-        assertIterate("foo", storage, expFooMap);
-    }
 
-    private void assertIterate(String pref,  KeyValueStorage storage, TreeMap<String, String> expMap) {
-        Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
-        Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3));
+
+        assertEquals(1, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        // Range for latest revision without max bound.
+        Cursor<Entry> cur = storage.range(key1, null);
+
+        Iterator<Entry> it = cur.iterator();
+
+        assertTrue(it.hasNext());
+
+        Entry e1 = it.next();
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(key1, e1.key());
+        assertArrayEquals(val1, e1.value());
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        Entry e2 = it.next();
 
-        // Order.
-        while (it.hasNext()) {
-            Entry entry = it.next();
-            Map.Entry<String, String> expEntry = expIt.next();
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2, e2.value());
+        assertEquals(1, e2.revision());
+        assertEquals(2, e2.updateCounter());
+
+        // Deliberately don't call it.hasNext()
+
+        Entry e3 = it.next();
+
+        assertFalse(e3.empty());
+        assertFalse(e3.tombstone());
+        assertArrayEquals(key3, e3.key());
+        assertArrayEquals(val3, e3.value());
+        assertEquals(1, e3.revision());
+        assertEquals(3, e3.updateCounter());
+
+        assertFalse(it.hasNext());
 
-            assertEquals(expEntry.getKey(), new String(entry.key()));
-            assertEquals(expEntry.getValue(), new String(entry.value()));
+        try {
+            it.next();
+
+            fail();
+        }
+        catch (NoSuchElementException e) {
+            System.out.println();
+            // No-op.
         }
 
-        // Range boundaries.
-        it = storage.iterate((pref + '_').getBytes());
+        // Range for latest revision with max bound.
+        cur = storage.range(key1, key3);
+
+        it = cur.iterator();
+
+        assertTrue(it.hasNext());
+
+        e1 = it.next();
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(key1, e1.key());
+        assertArrayEquals(val1, e1.value());
+        assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
 
-        while (it.hasNext()) {
-            Entry entry = it.next();
+        assertTrue(it.hasNext());
 
-            assertTrue(expMap.containsKey(new String(entry.key())));
+        e2 = it.next();
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2, e2.value());
+        assertEquals(1, e2.revision());
+        assertEquals(2, e2.updateCounter());
+
+        assertFalse(it.hasNext());
+
+        try {
+            it.next();
+
+            fail();
+        }
+        catch (NoSuchElementException e) {
+            System.out.println();
+            // No-op.
         }
     }
 
-    private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
-        for (int i = 0; i < 100; i++) {
-            String keyStr = pref + '_' + i;
+    @Test
+    public void watchCursorForRange() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
 
-            String valStr = "val_" + i;
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
 
-            expMap.put(keyStr, valStr);
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
-            byte[] key = keyStr.getBytes();
+        // Watch for all updates starting from revision 2.
+        Cursor<WatchEvent> cur = storage.watch(key1, null, 2);
 
-            byte[] val = valStr.getBytes();
+        Iterator<WatchEvent> it = cur.iterator();
 
-            storage.getAndPut(key, val);
-        }
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        // Revision is less than 2.
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1));
+
+        assertEquals(2, storage.revision());
+        assertEquals(4, storage.updateCounter());
+
+        // Revision is 2.
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertTrue(watchEvent.batch());
+
+        Map<Key, EntryEvent> map = watchEvent.entryEvents().stream()
+                .collect(Collectors.toMap(evt -> new Key(evt.entry().key()), identity()));
+
+        assertEquals(2, map.size());
+
+        // First update under revision.
+        EntryEvent e2 = map.get(new Key(key2));
+
+        assertNotNull(e2);
+
+        Entry oldEntry2 = e2.oldEntry();
+
+        assertFalse(oldEntry2.empty());
+        assertFalse(oldEntry2.tombstone());
+        assertEquals(1, oldEntry2.revision());
+        assertEquals(2, oldEntry2.updateCounter());
+        assertArrayEquals(key2, oldEntry2.key());
+        assertArrayEquals(val2_1, oldEntry2.value());
+
+        Entry newEntry2 = e2.entry();
+
+        assertFalse(newEntry2.empty());
+        assertFalse(newEntry2.tombstone());
+        assertEquals(2, newEntry2.revision());
+        assertEquals(3, newEntry2.updateCounter());
+        assertArrayEquals(key2, newEntry2.key());
+        assertArrayEquals(val2_2, newEntry2.value());
+
+        // Second update under revision.
+        EntryEvent e3 = map.get(new Key(key3));
+
+        assertNotNull(e3);
+
+        Entry oldEntry3 = e3.oldEntry();
+
+        assertTrue(oldEntry3.empty());
+        assertFalse(oldEntry3.tombstone());
+        assertArrayEquals(key3, oldEntry3.key());
+
+        Entry newEntry3 = e3.entry();
+
+        assertFalse(newEntry3.empty());
+        assertFalse(newEntry3.tombstone());
+        assertEquals(2, newEntry3.revision());
+        assertEquals(4, newEntry3.updateCounter());
+        assertArrayEquals(key3, newEntry3.key());
+        assertArrayEquals(val3_1, newEntry3.value());
+
+        assertFalse(it.hasNext());
+
+        storage.remove(key1);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        EntryEvent e1 = watchEvent.entryEvent();
+
+        Entry oldEntry1 = e1.oldEntry();
+
+        assertFalse(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+        assertEquals(1, oldEntry1.revision());
+        assertEquals(1, oldEntry1.updateCounter());
+        assertArrayEquals(key1, oldEntry1.key());
+        assertArrayEquals(val1_1, oldEntry1.value());
+
+        Entry newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertTrue(newEntry1.tombstone());
+        assertEquals(3, newEntry1.revision());
+        assertEquals(5, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertNull(newEntry1.value());
+
+        assertFalse(it.hasNext());
+    }
+
+
+    @Test
+    public void watchCursorForKey() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(key1, 1);
+
+        Iterator<WatchEvent> it = cur.iterator();
+
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        EntryEvent e1 = watchEvent.entryEvent();
+
+        Entry oldEntry1 = e1.oldEntry();
+
+        assertTrue(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+
+        Entry newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertFalse(newEntry1.tombstone());
+        assertEquals(1, newEntry1.revision());
+        assertEquals(1, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_1, newEntry1.value());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key2, val2_2);
+
+        assertFalse(it.hasNext());
+
+        storage.put(key1, val1_2);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        e1 = watchEvent.entryEvent();
+
+        oldEntry1 = e1.oldEntry();
+
+        assertFalse(oldEntry1.empty());
+        assertFalse(oldEntry1.tombstone());
+        assertEquals(1, oldEntry1.revision());
+        assertEquals(1, oldEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_1, newEntry1.value());
+
+         newEntry1 = e1.entry();
+
+        assertFalse(newEntry1.empty());
+        assertFalse(newEntry1.tombstone());
+        assertEquals(3, newEntry1.revision());
+        assertEquals(4, newEntry1.updateCounter());
+        assertArrayEquals(key1, newEntry1.key());
+        assertArrayEquals(val1_2, newEntry1.value());
+
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void watchCursorForKeys() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 11);
+        byte[] val1_2 = kv(1, 12);
+
+        byte[] key2 = k(2);
+        byte[] val2_1 = kv(2, 21);
+        byte[] val2_2 = kv(2, 22);
+
+        byte[] key3 = k(3);
+        byte[] val3_1 = kv(3, 31);
+        byte[] val3_2 = kv(3, 32);
+
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+
+        Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1);
+
+        Iterator<WatchEvent> it = cur.iterator();
+
+        assertFalse(it.hasNext());
+        assertNull(it.next());
+
+        storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1));
+
+        assertEquals(1, storage.revision());
+        assertEquals(3, storage.updateCounter());
+
+        assertTrue(it.hasNext());
+
+        WatchEvent watchEvent = it.next();
+
+        assertTrue(watchEvent.batch());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key2, val2_2);
+
+        assertTrue(it.hasNext());
+
+        watchEvent = it.next();
+
+        assertFalse(watchEvent.batch());
+
+        assertFalse(it.hasNext());
+
+        storage.put(key3, val3_2);
+
+        assertFalse(it.hasNext());
     }
 
     private static void fill(KeyValueStorage storage, int keySuffix, int num) {
@@ -1055,18 +1389,4 @@ class SimpleInMemoryKeyValueStorageTest {
     private static byte[] kv(int k, int v) {
         return ("key" + k + '_' + "val" + v).getBytes();
     }
-
-    private static class NoOpWatcher implements Watcher {
-        @Override public void register(@NotNull Watch watch) {
-            // No-op.
-        }
-
-        @Override public void notify(@NotNull Entry e) {
-            // No-op.
-        }
-
-        @Override public void cancel(@NotNull Watch watch) {
-            // No-op.
-        }
-    }
 }
\ No newline at end of file