You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/10 20:48:30 UTC
[ignite-3] 01/09: IGNITE-14389 Meta storage: in-memory
implementation WIP
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 71f810fcbaac42c08317ced466a3314a61d139d1
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Mar 30 21:21:07 2021 +0300
IGNITE-14389 Meta storage: in-memory implementation WIP
---
modules/metastorage-server/pom.xml | 60 +++++
.../ignite/internal/metastorage/server/Entry.java | 146 +++++++++++
.../metastorage/server/KeyValueStorage.java | 28 +++
.../server/SimpleInMemoryKeyValueStorage.java | 267 ++++++++++++++++++++
.../ignite/internal/metastorage/server/Watch.java | 45 ++++
.../internal/metastorage/server/Watcher.java | 13 +
.../internal/metastorage/server/WatcherImpl.java | 58 +++++
.../server/SimpleInMemoryKeyValueStorageTest.java | 274 +++++++++++++++++++++
pom.xml | 1 +
9 files changed, 892 insertions(+)
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
new file mode 100644
index 0000000..3c51fc5
--- /dev/null
+++ b/modules/metastorage-server/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>metastorage-server</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <!-- Test dependencies. -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
new file mode 100644
index 0000000..442aef9
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -0,0 +1,146 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a storage unit as entry with key, value and revision, where
+ * <ul>
+ * <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li>
+ * <ul>value - a data which is associated with a key and represented as an array of bytes.</ul>
+ * <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul>
+ * </ul>
+ *
+ * Instance of {@link #Entry} could represents:
+ * <ul>
+ * <li>A regular entry which stores a particular key, a value and a revision number.</li>
+ * <li>An empty entry which denotes absence a regular entry in the meta storage for a given key.
+ * A revision is 0 for such kind of entry.</li>
+ * <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
+ * </ul>
+ */
+//TODO: Separate client and server entries. Empty and tombstone for client is the same.
+public class Entry {
+ /** Entry key. Couldn't be {@code null}. */
+ @NotNull
+ final private byte[] key;
+
+ /**
+ * Entry value.
+ * <p>
+ * {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries.
+ * </p>
+ */
+ @Nullable
+ final private byte[] val;
+
+ /**
+ * Revision number corresponding to this particular entry.
+ * <p>
+ * {@code rev == 0} for {@link #empty()} entry,
+ * {@code rev > 0} for regular and {@link #tombstone()} entries.
+ * </p>
+ */
+ final private long rev;
+
+ /**
+ * Constructor.
+ *
+ * @param key Key bytes. Couldn't be {@code null}.
+ * @param val Value bytes. Couldn't be {@code null}.
+ * @param rev Revision.
+ */
+ // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
+ public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+ assert key != null : "key can't be null";
+ assert val != null : "value can't be null";
+
+ this.key = key;
+ this.val = val;
+ this.rev = rev;
+ }
+
+ /**
+ * Constructor for empty and tombstone entries.
+ *
+ * @param key Key bytes. Couldn't be {@code null}.
+ * @param rev Revision.
+ */
+ private Entry(@NotNull byte[] key, long rev) {
+ assert key != null : "key can't be null";
+
+ this.key = key;
+ this.val = null;
+ this.rev = rev;
+ }
+
+ /**
+ * Creates an instance of empty entry for a given key.
+ *
+ * @param key Key bytes. Couldn't be {@code null}.
+ * @return Empty entry.
+ */
+ @NotNull
+ public static Entry empty(byte[] key) {
+ return new Entry(key, 0);
+ }
+
+ /**
+ * Creates an instance of tombstone entry for a given key and a revision.
+ *
+ * @param key Key bytes. Couldn't be {@code null}.
+ * @return Empty entry.
+ */
+ @NotNull
+ public static Entry tombstone(byte[] key, long rev) {
+ assert rev > 0 : "rev must be positive for tombstone entry.";
+
+ return new Entry(key, rev);
+ }
+
+ /**
+ * Returns a key.
+ *
+ * @return Key.
+ */
+ @NotNull
+ public byte[] key() {
+ return key;
+ }
+
+ /**
+ * Returns a value.
+ *
+ * @return Value.
+ */
+ @Nullable
+ public byte[] value() {
+ return val;
+ }
+
+ /**
+ * Returns a revision.
+ * @return Revision.
+ */
+ public long revision() {
+ return rev;
+ }
+
+ /**
+ * Returns value which denotes whether entry is tombstone or not.
+ *
+ * @return {@code True} if entry is tombstone, otherwise - {@code false}.
+ */
+ public boolean tombstone() {
+ return val == null && rev > 0;
+ }
+
+ /**
+ * Returns value which denotes whether entry is empty or not.
+ *
+ * @return {@code True} if entry is empty, otherwise - {@code false}.
+ */
+ public boolean empty() {
+ return val == null && rev == 0;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
new file mode 100644
index 0000000..1bf6b78
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -0,0 +1,28 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+
+public interface KeyValueStorage {
+
+ long revision();
+
+ @NotNull
+ Entry put(byte[] key, byte[] value);
+
+ @NotNull
+ Entry get(byte[] key);
+
+ @NotNull
+ Entry get(byte[] key, long rev);
+
+ @NotNull
+ Entry remove(byte[] key);
+
+ Iterator<Entry> iterate(byte[] key);
+
+ //Iterator<Entry> iterate(long rev);
+
+ void compact();
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
new file mode 100644
index 0000000..9059aec
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -0,0 +1,267 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
+ */
+public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
+ private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+
+ private static final byte[] TOMBSTONE = new byte[0];
+
+ private static final long LATEST_REV = -1;
+
+ private final Watcher watcher;
+
+ private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+ private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+
+ private long grev = 0;
+
+ private final Object mux = new Object();
+
+ public SimpleInMemoryKeyValueStorage(Watcher watcher) {
+ this.watcher = watcher;
+ }
+
+ @Override public long revision() {
+ return grev;
+ }
+
+ @NotNull
+ @Override public Entry put(byte[] key, byte[] val) {
+ synchronized (mux) {
+ long crev = ++grev;
+
+ // Update keysIdx.
+ List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+ long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+ revs.add(crev);
+
+ // Update revsIdx.
+ NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+ entries.put(key, val);
+
+ revsIdx.put(crev, entries);
+
+ // Return previous value.
+ if (lrev == 0)
+ return Entry.empty(key);
+
+ NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+
+ Entry res = new Entry(key, lastVal.get(key), lrev);
+
+ //TODO: notify watchers
+
+ return res;
+ }
+ }
+
+ @NotNull
+ @Override public Entry get(byte[] key) {
+ synchronized (mux) {
+ return doGet(key, LATEST_REV);
+ }
+ }
+
+ @NotNull
+ @TestOnly
+ @Override public Entry get(byte[] key, long rev) {
+ synchronized (mux) {
+ return doGet(key, rev);
+ }
+ }
+
+ @NotNull
+ @Override public Entry remove(byte[] key) {
+ synchronized (mux) {
+ Entry e = doGet(key, LATEST_REV);
+
+ if (e.value() == null)
+ return e;
+
+ return put(key, TOMBSTONE);
+ }
+ }
+
+ @Override public Iterator<Entry> iterate(byte[] keyFrom) {
+ synchronized (mux) {
+ NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
+
+ final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
+
+ return new Iterator<>() {
+ private Map.Entry<byte[], List<Long>> curr;
+ private boolean hasNext;
+
+ private void advance() {
+ if (it.hasNext()) {
+ Map.Entry<byte[], List<Long>> e = it.next();
+
+ byte[] key = e.getKey();
+
+ if (!isPrefix(keyFrom, key))
+ hasNext = false;
+ else {
+ curr = e;
+
+ hasNext = true;
+ }
+ } else
+ hasNext = false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ synchronized (mux) {
+ if (curr == null)
+ advance();
+
+ return hasNext;
+ }
+ }
+
+ @Override
+ public Entry next() {
+ synchronized (mux) {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Map.Entry<byte[], List<Long>> e = curr;
+
+ curr = null;
+
+ byte[] key = e.getKey();
+
+ List<Long> revs = e.getValue();
+
+ long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+
+ if (rev == 0) {
+ throw new IllegalStateException("rev == 0");
+ //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+ }
+
+ NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+
+ if (vals == null || vals.isEmpty()) {
+ throw new IllegalStateException("vals == null || vals.isEmpty()");
+ //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+ }
+
+ byte[] val = vals.get(key);
+
+ return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+ }
+ }
+ };
+ }
+ }
+
+ @Override public void compact() {
+ synchronized (mux) {
+ NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+ NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+
+ keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
+
+ keysIdx = compactedKeysIdx;
+
+ revsIdx = compactedRevsIdx;
+ }
+ }
+
+ private void compactForKey(
+ byte[] key,
+ List<Long> revs,
+ NavigableMap<byte[], List<Long>> compactedKeysIdx,
+ NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+ ) {
+ Long lrev = lastRevision(revs);
+
+ NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+
+ byte[] lastVal = kv.get(key);
+
+ if (lastVal != TOMBSTONE) {
+ compactedKeysIdx.put(key, listOf(lrev));
+
+ NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+ lrev,
+ k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+ );
+
+ compactedKv.put(key, lastVal);
+ }
+ }
+
+ /**
+ * Returns entry for given key.
+ *
+ * @param key Key.
+ * @param rev Revision.
+ * @return Entry for given key.
+ */
+ @NotNull private Entry doGet(byte[] key, long rev) {
+ List<Long> revs = keysIdx.get(key);
+
+ if (revs == null || revs.isEmpty())
+ return Entry.empty(key);
+
+ long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+
+ NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+
+ if (entries == null || entries.isEmpty())
+ return Entry.empty(key);
+
+ byte[] val = entries.get(key);
+
+ if (val == TOMBSTONE)
+ return Entry.tombstone(key, lrev);
+
+ return new Entry(key, val , lrev);
+ }
+
+ private static boolean isPrefix(byte[] pref, byte[] term) {
+ if (pref.length > term.length)
+ return false;
+
+ for (int i = 0; i < pref.length - 1; i++) {
+ if (pref[i] != term[i])
+ return false;
+ }
+
+ return true;
+ }
+
+ private static long lastRevision(List<Long> revs) {
+ return revs.get(revs.size() - 1);
+ }
+
+ private static List<Long> listOf(long val) {
+ List<Long> res = new ArrayList<>();
+
+ res.add(val);
+
+ return res;
+ }
+
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
new file mode 100644
index 0000000..26cfa5c
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class Watch {
+ private static final Comparator<byte[]> CMP = Arrays::compare;
+
+ private static final long ANY_REVISION = -1;
+
+ @Nullable
+ private byte[] startKey;
+
+ @Nullable
+ private byte[] endKey;
+
+ long rev = ANY_REVISION;
+
+ public void startKey(byte[] startKey) {
+ this.startKey = startKey;
+ }
+
+ public void endKey(byte[] endKey) {
+ this.endKey = endKey;
+ }
+
+ public void revision(long rev) {
+ this.rev = rev;
+ }
+
+ public void notify(Entry e) {
+ if (startKey != null && CMP.compare(e.key(), startKey) < 0)
+ return;
+
+ if (endKey != null && CMP.compare(e.key(), endKey) > 0)
+ return;
+
+ if (rev != ANY_REVISION && e.revision() <= rev)
+ return;
+
+ System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
new file mode 100644
index 0000000..5516d06
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface Watcher {
+ void register(@NotNull Watch watch);
+
+ void notify(@NotNull Entry e);
+
+ //TODO: implement
+ void cancel(@NotNull Watch watch);
+}
+
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
new file mode 100644
index 0000000..dc126a0
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
@@ -0,0 +1,58 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WatcherImpl implements Watcher {
+ private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+ private final List<Watch> watches = new ArrayList<>();
+
+ private volatile boolean stop;
+
+ private final Object mux = new Object();
+
+ @Override public void register(@NotNull Watch watch) {
+ synchronized (mux) {
+ watches.add(watch);
+ }
+ }
+
+ @Override public void notify(@NotNull Entry e) {
+ queue.offer(e);
+ }
+
+ @Override
+ public void cancel(@NotNull Watch watch) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ public void shutdown() {
+ stop = true;
+ }
+
+ private class WatcherWorker implements Runnable {
+ @Override public void run() {
+ while (!stop) {
+ try {
+ Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
+
+ if (e != null) {
+ synchronized (mux) {
+ watches.forEach(w -> w.notify(e));
+ }
+ }
+ }
+ catch (InterruptedException interruptedException) {
+ // No-op.
+ }
+ }
+ }
+ }
+}
+
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
new file mode 100644
index 0000000..f7fb17e
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -0,0 +1,274 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleInMemoryKeyValueStorageTest {
+ private KeyValueStorage storage;
+
+ @BeforeEach
+ public void setUp() {
+ storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+ }
+
+ @Test
+ void putGetRemoveCompact() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 1);
+ byte[] val1_3 = kv(1, 3);
+
+ byte[] key2 = k(2);
+ byte[] val2_2 = kv(2, 2);
+
+ assertEquals(0, storage.revision());
+
+ // Previous entry is empty.
+ Entry emptyEntry = storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertTrue(emptyEntry.empty());
+
+ // Entry with rev == 1.
+ Entry e1_1 = storage.get(key1);
+
+ assertFalse(e1_1.empty());
+ assertFalse(e1_1.tombstone());
+ assertArrayEquals(key1, e1_1.key());
+ assertArrayEquals(val1_1, e1_1.value());
+ assertEquals(1, e1_1.revision());
+ assertEquals(1, storage.revision());
+
+ // Previous entry is empty.
+ emptyEntry = storage.put(key2, val2_2);
+
+ assertEquals(2, storage.revision());
+ assertTrue(emptyEntry.empty());
+
+ // Entry with rev == 2.
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertArrayEquals(key2, e2.key());
+ assertArrayEquals(val2_2, e2.value());
+ assertEquals(2, e2.revision());
+ assertEquals(2, storage.revision());
+
+ // Previous entry is not empty.
+ e1_1 = storage.put(key1, val1_3);
+
+ assertFalse(e1_1.empty());
+ assertFalse(e1_1.tombstone());
+ assertArrayEquals(key1, e1_1.key());
+ assertArrayEquals(val1_1, e1_1.value());
+ assertEquals(1, e1_1.revision());
+ assertEquals(3, storage.revision());
+
+ // Entry with rev == 3.
+ Entry e1_3 = storage.get(key1);
+
+ assertFalse(e1_3.empty());
+ assertFalse(e1_3.tombstone());
+ assertArrayEquals(key1, e1_3.key());
+ assertArrayEquals(val1_3, e1_3.value());
+ assertEquals(3, e1_3.revision());
+ assertEquals(3, storage.revision());
+
+ // Remove existing entry.
+ Entry e2_2 = storage.remove(key2);
+
+ assertFalse(e2_2.empty());
+ assertFalse(e2_2.tombstone());
+ assertArrayEquals(key2, e2_2.key());
+ assertArrayEquals(val2_2, e2_2.value());
+ assertEquals(2, e2_2.revision());
+ assertEquals(4, storage.revision()); // Storage revision is changed.
+
+ // Remove already removed entry.
+ Entry tombstoneEntry = storage.remove(key2);
+
+ assertFalse(tombstoneEntry.empty());
+ assertTrue(tombstoneEntry.tombstone());
+ assertEquals(4, storage.revision()); // Storage revision is not changed.
+
+ // Compact and check that tombstones are removed.
+ storage.compact();
+
+ assertEquals(4, storage.revision());
+ assertTrue(storage.remove(key2).empty());
+ assertTrue(storage.get(key2).empty());
+
+ // Remove existing entry.
+ e1_3 = storage.remove(key1);
+
+ assertFalse(e1_3.empty());
+ assertFalse(e1_3.tombstone());
+ assertArrayEquals(key1, e1_3.key());
+ assertArrayEquals(val1_3, e1_3.value());
+ assertEquals(3, e1_3.revision());
+ assertEquals(5, storage.revision()); // Storage revision is changed.
+
+ // Remove already removed entry.
+ tombstoneEntry = storage.remove(key1);
+
+ assertFalse(tombstoneEntry.empty());
+ assertTrue(tombstoneEntry.tombstone());
+ assertEquals(5, storage.revision()); // // Storage revision is not changed.
+
+ // Compact and check that tombstones are removed.
+ storage.compact();
+
+ assertEquals(5, storage.revision());
+ assertTrue(storage.remove(key1).empty());
+ assertTrue(storage.get(key1).empty());
+ }
+
+ @Test
+ void compact() {
+ assertEquals(0, storage.revision());
+
+ // Compact empty.
+ storage.compact();
+
+ assertEquals(0, storage.revision());
+
+ // Compact non-empty.
+ fill(storage, 1, 1);
+
+ assertEquals(1, storage.revision());
+
+ fill(storage, 2, 2);
+
+ assertEquals(3, storage.revision());
+
+ fill(storage, 3, 3);
+
+ assertEquals(6, storage.revision());
+
+ storage.remove(k(3));
+
+ assertEquals(7, storage.revision());
+ assertTrue(storage.get(k(3)).tombstone());
+
+ storage.compact();
+
+ assertEquals(7, storage.revision());
+
+ Entry e1 = storage.get(k(1));
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertArrayEquals(k(1), e1.key());
+ assertArrayEquals(kv(1,1), e1.value());
+ assertEquals(1, e1.revision());
+
+ Entry e2 = storage.get(k(2));
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertArrayEquals(k(2), e2.key());
+ assertArrayEquals(kv(2,2), e2.value());
+ assertTrue(storage.get(k(2), 2).empty());
+ assertEquals(3, e2.revision());
+
+ Entry e3 = storage.get(k(3));
+
+ assertTrue(e3.empty());
+ assertTrue(storage.get(k(3), 5).empty());
+ assertTrue(storage.get(k(3), 6).empty());
+ assertTrue(storage.get(k(3), 7).empty());
+ }
+
+ @Test
+ void iterate() {
+ TreeMap<String, String> expFooMap = new TreeMap<>();
+ TreeMap<String, String> expKeyMap = new TreeMap<>();
+ TreeMap<String, String> expZooMap = new TreeMap<>();
+
+ fill("foo", storage, expFooMap);
+ fill("key", storage, expKeyMap);
+ fill("zoo", storage, expZooMap);
+
+ assertEquals(300, storage.revision());
+
+ assertIterate("key", storage, expKeyMap);
+ assertIterate("zoo", storage, expZooMap);
+ assertIterate("foo", storage, expFooMap);
+ }
+
+ private void assertIterate(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
+ Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
+ Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+
+ // Order.
+ while (it.hasNext()) {
+ Entry entry = it.next();
+ Map.Entry<String, String> expEntry = expIt.next();
+
+ assertEquals(expEntry.getKey(), new String(entry.key()));
+ assertEquals(expEntry.getValue(), new String(entry.value()));
+ }
+
+ // Range boundaries.
+ it = storage.iterate((pref + '_').getBytes());
+
+ while (it.hasNext()) {
+ Entry entry = it.next();
+
+ assertTrue(expMap.containsKey(new String(entry.key())));
+ }
+ }
+
+ private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
+ for (int i = 0; i < 100; i++) {
+ String keyStr = pref + '_' + i;
+
+ String valStr = "val_" + i;
+
+ expMap.put(keyStr, valStr);
+
+ byte[] key = keyStr.getBytes();
+
+ byte[] val = valStr.getBytes();
+
+ storage.put(key, val);
+ }
+ }
+
+ private static void fill(KeyValueStorage storage, int keySuffix, int num) {
+ for (int i = 0; i < num; i++)
+ storage.put(k(keySuffix), kv(keySuffix, i + 1));
+ }
+
+ private static byte[] k(int k) {
+ return ("key" + k).getBytes();
+ }
+
+ private static byte[] kv(int k, int v) {
+ return ("key" + k + '_' + "val" + v).getBytes();
+ }
+
+ private static class NoOpWatcher implements Watcher {
+ @Override public void register(@NotNull Watch watch) {
+ // No-op.
+ }
+
+ @Override public void notify(@NotNull Entry e) {
+ // No-op.
+ }
+
+ @Override public void cancel(@NotNull Watch watch) {
+ // No-op.
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 0f89644..3e7da2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
<module>modules/metastorage</module>
<module>modules/metastorage-client</module>
<module>modules/metastorage-common</module>
+ <module>modules/metastorage-server</module>
<module>modules/network</module>
<module>modules/raft</module>
<module>modules/raft-client</module>