You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/04/08 18:50:39 UTC

[ignite-3] 01/04: IGNITE-14389 Meta storage: in-memory implementation WIP

This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 7fc965b1a1e9126fd00983f354b846aa1de70d43
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Mar 30 21:21:07 2021 +0300

    IGNITE-14389 Meta storage: in-memory implementation WIP
---
 modules/metastorage-server/pom.xml                 |  60 +++++
 .../ignite/internal/metastorage/server/Entry.java  | 146 +++++++++++
 .../metastorage/server/KeyValueStorage.java        |  28 +++
 .../server/SimpleInMemoryKeyValueStorage.java      | 267 ++++++++++++++++++++
 .../ignite/internal/metastorage/server/Watch.java  |  45 ++++
 .../internal/metastorage/server/Watcher.java       |  13 +
 .../internal/metastorage/server/WatcherImpl.java   |  58 +++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 274 +++++++++++++++++++++
 pom.xml                                            |   1 +
 9 files changed, 892 insertions(+)

diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
new file mode 100644
index 0000000..3c51fc5
--- /dev/null
+++ b/modules/metastorage-server/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>metastorage-server</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+
+        <!-- Test dependencies. -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
new file mode 100644
index 0000000..442aef9
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -0,0 +1,146 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a storage unit as entry with key, value and revision, where
+ * <ul>
+ *     <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li>
+ *     <ul>value - a data which is associated with a key and represented as an array of bytes.</ul>
+ *     <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul>
+ * </ul>
+ *
+ * Instance of {@link #Entry} could represents:
+ * <ul>
+ *     <li>A regular entry which stores a particular key, a value and a revision number.</li>
+ *     <li>An empty entry which denotes absence a regular entry in the meta storage for a given key.
+ *     A revision is 0 for such kind of entry.</li>
+ *     <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
+ * </ul>
+ */
+//TODO: Separate client and server entries. Empty and tombstone for client is the same.
+public class Entry {
+    /** Entry key. Couldn't be {@code null}. */
+    @NotNull
+    final private byte[] key;
+
+    /**
+     * Entry value.
+     * <p>
+     *     {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries.
+     * </p>
+     */
+    @Nullable
+    final private byte[] val;
+
+    /**
+     * Revision number corresponding to this particular entry.
+     * <p>
+     *     {@code rev == 0} for {@link #empty()} entry,
+     *     {@code rev > 0} for regular and {@link #tombstone()} entries.
+     * </p>
+     */
+    final private long rev;
+
+    /**
+     * Constructor.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param val Value bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
+    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+        assert key != null : "key can't be null";
+        assert val != null : "value can't be null";
+
+        this.key = key;
+        this.val = val;
+        this.rev = rev;
+    }
+
+    /**
+     * Constructor for empty and tombstone entries.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    private Entry(@NotNull byte[] key, long rev) {
+        assert key != null : "key can't be null";
+
+        this.key = key;
+        this.val = null;
+        this.rev = rev;
+    }
+
+    /**
+     * Creates an instance of empty entry for a given key.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry empty(byte[] key) {
+        return new Entry(key, 0);
+    }
+
+    /**
+     * Creates an instance of tombstone entry for a given key and a revision.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry tombstone(byte[] key, long rev) {
+        assert rev > 0 : "rev must be positive for tombstone entry.";
+
+        return new Entry(key, rev);
+    }
+
+    /**
+     * Returns a key.
+     *
+     * @return Key.
+     */
+    @NotNull
+    public byte[] key() {
+        return key;
+    }
+
+    /**
+     * Returns a value.
+     *
+     * @return Value.
+     */
+    @Nullable
+    public byte[] value() {
+        return val;
+    }
+
+    /**
+     * Returns a revision.
+     * @return Revision.
+     */
+    public long revision() {
+        return rev;
+    }
+
+    /**
+     * Returns value which denotes whether entry is tombstone or not.
+     *
+     * @return {@code True} if entry is tombstone, otherwise - {@code false}.
+     */
+    public boolean tombstone() {
+        return val == null && rev > 0;
+    }
+
+    /**
+     * Returns value which denotes whether entry is empty or not.
+     *
+     * @return {@code True} if entry is empty, otherwise - {@code false}.
+     */
+    public boolean empty() {
+        return val == null && rev == 0;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
new file mode 100644
index 0000000..1bf6b78
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -0,0 +1,28 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+
+public interface KeyValueStorage {
+
+    long revision();
+
+    @NotNull
+    Entry put(byte[] key, byte[] value);
+
+    @NotNull
+    Entry get(byte[] key);
+
+    @NotNull
+    Entry get(byte[] key, long rev);
+
+    @NotNull
+    Entry remove(byte[] key);
+
+    Iterator<Entry> iterate(byte[] key);
+
+    //Iterator<Entry> iterate(long rev);
+
+    void compact();
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
new file mode 100644
index 0000000..9059aec
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -0,0 +1,267 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
+ */
+public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
+    private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+
+    private static final byte[] TOMBSTONE = new byte[0];
+
+    private static final long LATEST_REV = -1;
+
+    private final Watcher watcher;
+
+    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+    private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+
+    private long grev = 0;
+
+    private final Object mux = new Object();
+
+    public SimpleInMemoryKeyValueStorage(Watcher watcher) {
+        this.watcher = watcher;
+    }
+
+    @Override public long revision() {
+        return grev;
+    }
+
+    @NotNull
+    @Override public Entry put(byte[] key, byte[] val) {
+        synchronized (mux) {
+            long crev = ++grev;
+
+            // Update keysIdx.
+            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+            long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+            revs.add(crev);
+
+            // Update revsIdx.
+            NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            entries.put(key, val);
+
+            revsIdx.put(crev, entries);
+
+            // Return previous value.
+            if (lrev == 0)
+                return Entry.empty(key);
+
+            NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+
+            Entry res = new Entry(key, lastVal.get(key), lrev);
+
+            //TODO: notify watchers
+
+            return res;
+        }
+    }
+
+    @NotNull
+    @Override public Entry get(byte[] key) {
+        synchronized (mux) {
+            return doGet(key, LATEST_REV);
+        }
+    }
+
+    @NotNull
+    @TestOnly
+    @Override public Entry get(byte[] key, long rev) {
+        synchronized (mux) {
+            return doGet(key, rev);
+        }
+    }
+
+    @NotNull
+    @Override public Entry remove(byte[] key) {
+        synchronized (mux) {
+            Entry e = doGet(key, LATEST_REV);
+
+            if (e.value() == null)
+                return e;
+
+            return put(key, TOMBSTONE);
+        }
+    }
+
+    @Override public Iterator<Entry> iterate(byte[] keyFrom) {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
+
+            final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
+
+            return new Iterator<>() {
+                private Map.Entry<byte[], List<Long>> curr;
+                private boolean hasNext;
+
+                private void advance() {
+                    if (it.hasNext()) {
+                        Map.Entry<byte[], List<Long>> e = it.next();
+
+                        byte[] key = e.getKey();
+
+                        if (!isPrefix(keyFrom, key))
+                            hasNext = false;
+                        else {
+                            curr = e;
+
+                            hasNext = true;
+                        }
+                    } else
+                        hasNext = false;
+                }
+
+                @Override
+                public boolean hasNext() {
+                    synchronized (mux) {
+                        if (curr == null)
+                            advance();
+
+                        return hasNext;
+                    }
+                }
+
+                @Override
+                public Entry next() {
+                    synchronized (mux) {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        Map.Entry<byte[], List<Long>> e = curr;
+
+                        curr = null;
+
+                        byte[] key = e.getKey();
+
+                        List<Long> revs = e.getValue();
+
+                        long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+
+                        if (rev == 0) {
+                            throw new IllegalStateException("rev == 0");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+
+                        if (vals == null || vals.isEmpty()) {
+                            throw new IllegalStateException("vals == null || vals.isEmpty()");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        byte[] val = vals.get(key);
+
+                        return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+                    }
+                }
+            };
+        }
+    }
+
+    @Override public void compact() {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+
+            keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
+
+            keysIdx = compactedKeysIdx;
+
+            revsIdx = compactedRevsIdx;
+        }
+    }
+
+    private void compactForKey(
+            byte[] key,
+            List<Long> revs,
+            NavigableMap<byte[], List<Long>> compactedKeysIdx,
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+    ) {
+        Long lrev = lastRevision(revs);
+
+        NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+
+        byte[] lastVal = kv.get(key);
+
+        if (lastVal != TOMBSTONE) {
+            compactedKeysIdx.put(key, listOf(lrev));
+
+            NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+                    lrev,
+                    k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+            );
+
+            compactedKv.put(key, lastVal);
+        }
+    }
+
+    /**
+     * Returns entry for given key.
+     *
+     * @param key Key.
+     * @param rev Revision.
+     * @return Entry for given key.
+     */
+    @NotNull private Entry doGet(byte[] key, long rev) {
+        List<Long> revs = keysIdx.get(key);
+
+        if (revs == null || revs.isEmpty())
+            return Entry.empty(key);
+
+        long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+
+        NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+
+        if (entries == null || entries.isEmpty())
+            return Entry.empty(key);
+
+        byte[] val = entries.get(key);
+
+        if (val == TOMBSTONE)
+            return Entry.tombstone(key, lrev);
+
+        return new Entry(key, val , lrev);
+    }
+
+    private static boolean isPrefix(byte[] pref, byte[] term) {
+        if (pref.length > term.length)
+            return false;
+
+        for (int i = 0; i < pref.length - 1; i++) {
+            if (pref[i] != term[i])
+                return false;
+        }
+
+        return true;
+    }
+
+    private static long lastRevision(List<Long> revs) {
+        return revs.get(revs.size() - 1);
+    }
+
+    private static  List<Long> listOf(long val) {
+        List<Long> res = new ArrayList<>();
+
+        res.add(val);
+
+        return res;
+    }
+
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
new file mode 100644
index 0000000..26cfa5c
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class Watch {
+    private static final Comparator<byte[]> CMP = Arrays::compare;
+
+    private static final long ANY_REVISION = -1;
+
+    @Nullable
+    private byte[] startKey;
+
+    @Nullable
+    private byte[] endKey;
+
+    long rev = ANY_REVISION;
+
+    public void startKey(byte[] startKey) {
+        this.startKey = startKey;
+    }
+
+    public void endKey(byte[] endKey) {
+        this.endKey = endKey;
+    }
+
+    public void revision(long rev) {
+        this.rev = rev;
+    }
+
+    public void notify(Entry e) {
+        if (startKey != null && CMP.compare(e.key(), startKey) < 0)
+            return;
+
+        if (endKey != null && CMP.compare(e.key(), endKey) > 0)
+            return;
+
+        if (rev != ANY_REVISION && e.revision() <= rev)
+            return;
+
+        System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
new file mode 100644
index 0000000..5516d06
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface Watcher {
+    void register(@NotNull Watch watch);
+
+    void notify(@NotNull Entry e);
+
+    //TODO: implement
+    void cancel(@NotNull Watch watch);
+}
+
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
new file mode 100644
index 0000000..dc126a0
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
@@ -0,0 +1,58 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WatcherImpl implements Watcher {
+    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+    private final List<Watch> watches = new ArrayList<>();
+
+    private volatile boolean stop;
+
+    private final Object mux = new Object();
+
+    @Override public void register(@NotNull Watch watch) {
+        synchronized (mux) {
+            watches.add(watch);
+        }
+    }
+
+    @Override public void notify(@NotNull Entry e) {
+        queue.offer(e);
+    }
+
+    @Override
+    public void cancel(@NotNull Watch watch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    public void shutdown() {
+        stop = true;
+    }
+
+    private class WatcherWorker implements Runnable {
+        @Override public void run() {
+            while (!stop) {
+                try {
+                    Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
+
+                    if (e != null) {
+                        synchronized (mux) {
+                            watches.forEach(w -> w.notify(e));
+                        }
+                    }
+                }
+                catch (InterruptedException interruptedException) {
+                    // No-op.
+                }
+            }
+        }
+    }
+}
+
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
new file mode 100644
index 0000000..f7fb17e
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -0,0 +1,274 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleInMemoryKeyValueStorageTest {
+    private KeyValueStorage storage;
+
+    @BeforeEach
+    public void setUp() {
+        storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+    }
+
+    @Test
+    void putGetRemoveCompact() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 1);
+        byte[] val1_3 = kv(1, 3);
+
+        byte[] key2 = k(2);
+        byte[] val2_2 = kv(2, 2);
+
+        assertEquals(0, storage.revision());
+
+        // Previous entry is empty.
+        Entry emptyEntry = storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 1.
+        Entry e1_1 = storage.get(key1);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(1, storage.revision());
+
+        // Previous entry is empty.
+        emptyEntry = storage.put(key2, val2_2);
+
+        assertEquals(2, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 2.
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2_2, e2.value());
+        assertEquals(2, e2.revision());
+        assertEquals(2, storage.revision());
+
+        // Previous entry is not empty.
+        e1_1 = storage.put(key1, val1_3);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(3, storage.revision());
+
+        // Entry with rev == 3.
+        Entry e1_3 = storage.get(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(3, storage.revision());
+
+        // Remove existing entry.
+        Entry e2_2 = storage.remove(key2);
+
+        assertFalse(e2_2.empty());
+        assertFalse(e2_2.tombstone());
+        assertArrayEquals(key2, e2_2.key());
+        assertArrayEquals(val2_2, e2_2.value());
+        assertEquals(2, e2_2.revision());
+        assertEquals(4, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        Entry tombstoneEntry = storage.remove(key2);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(4, storage.revision()); // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(4, storage.revision());
+        assertTrue(storage.remove(key2).empty());
+        assertTrue(storage.get(key2).empty());
+
+        // Remove existing entry.
+        e1_3 = storage.remove(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(5, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        tombstoneEntry = storage.remove(key1);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(5, storage.revision()); // // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(5, storage.revision());
+        assertTrue(storage.remove(key1).empty());
+        assertTrue(storage.get(key1).empty());
+    }
+
+    @Test
+    void compact() {
+        assertEquals(0, storage.revision());
+
+        // Compact empty.
+        storage.compact();
+
+        assertEquals(0, storage.revision());
+
+        // Compact non-empty.
+        fill(storage, 1, 1);
+
+        assertEquals(1, storage.revision());
+
+        fill(storage, 2, 2);
+
+        assertEquals(3, storage.revision());
+
+        fill(storage, 3, 3);
+
+        assertEquals(6, storage.revision());
+
+        storage.remove(k(3));
+
+        assertEquals(7, storage.revision());
+        assertTrue(storage.get(k(3)).tombstone());
+
+        storage.compact();
+
+        assertEquals(7, storage.revision());
+
+        Entry e1 = storage.get(k(1));
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(k(1), e1.key());
+        assertArrayEquals(kv(1,1), e1.value());
+        assertEquals(1, e1.revision());
+
+        Entry e2 = storage.get(k(2));
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(k(2), e2.key());
+        assertArrayEquals(kv(2,2), e2.value());
+        assertTrue(storage.get(k(2), 2).empty());
+        assertEquals(3, e2.revision());
+
+        Entry e3 = storage.get(k(3));
+
+        assertTrue(e3.empty());
+        assertTrue(storage.get(k(3), 5).empty());
+        assertTrue(storage.get(k(3), 6).empty());
+        assertTrue(storage.get(k(3), 7).empty());
+    }
+
+    @Test
+    void iterate() {
+        TreeMap<String, String> expFooMap = new TreeMap<>();
+        TreeMap<String, String> expKeyMap = new TreeMap<>();
+        TreeMap<String, String> expZooMap = new TreeMap<>();
+
+        fill("foo", storage, expFooMap);
+        fill("key", storage, expKeyMap);
+        fill("zoo", storage, expZooMap);
+
+        assertEquals(300, storage.revision());
+
+        assertIterate("key", storage, expKeyMap);
+        assertIterate("zoo", storage, expZooMap);
+        assertIterate("foo", storage, expFooMap);
+    }
+
+    private void assertIterate(String pref,  KeyValueStorage storage, TreeMap<String, String> expMap) {
+        Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
+        Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+
+        // Order.
+        while (it.hasNext()) {
+            Entry entry = it.next();
+            Map.Entry<String, String> expEntry = expIt.next();
+
+            assertEquals(expEntry.getKey(), new String(entry.key()));
+            assertEquals(expEntry.getValue(), new String(entry.value()));
+        }
+
+        // Range boundaries.
+        it = storage.iterate((pref + '_').getBytes());
+
+        while (it.hasNext()) {
+            Entry entry = it.next();
+
+            assertTrue(expMap.containsKey(new String(entry.key())));
+        }
+    }
+
+    private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
+        for (int i = 0; i < 100; i++) {
+            String keyStr = pref + '_' + i;
+
+            String valStr = "val_" + i;
+
+            expMap.put(keyStr, valStr);
+
+            byte[] key = keyStr.getBytes();
+
+            byte[] val = valStr.getBytes();
+
+            storage.put(key, val);
+        }
+    }
+
+    private static void fill(KeyValueStorage storage, int keySuffix, int num) {
+        for (int i = 0; i < num; i++)
+            storage.put(k(keySuffix), kv(keySuffix, i + 1));
+    }
+
+    private static byte[] k(int k) {
+        return ("key" + k).getBytes();
+    }
+
+    private static byte[] kv(int k, int v) {
+        return ("key" + k + '_' + "val" + v).getBytes();
+    }
+
+    private static class NoOpWatcher implements Watcher {
+        @Override public void register(@NotNull Watch watch) {
+            // No-op.
+        }
+
+        @Override public void notify(@NotNull Entry e) {
+            // No-op.
+        }
+
+        @Override public void cancel(@NotNull Watch watch) {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4e87920..5557667 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
         <module>modules/core</module>
         <module>modules/metastorage-client</module>
         <module>modules/metastorage-common</module>
+        <module>modules/metastorage-server</module>
         <module>modules/network</module>
         <module>modules/raft</module>
         <module>modules/raft-client</module>