You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/04/06 19:26:24 UTC

[ignite-3] branch ignite-14389 updated (18b212f -> b88359d)

This is an automated email from the ASF dual-hosted git repository.

agura pushed a change to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


 discard 18b212f  IGNITE-14398: Meta storage: added update counter
 discard f3f3f47  IGNITE-14389 Meta storage: in-memory implementation WIP
     add 4d82a7e  IGNITE-14438 Added README.md for cli and cli-common modules. Fixes #75
     add e9ef6d4  IGNITE-14295 Network messages refactored to have a common interface and proper serializers/deserializers. (#70)
     add 297468b  IGNITE-14180 Implemented the ability to subscribe to configuration updates. (#76)
     add ed2b820  IGNITE-14460 Single-node RAFT server (#80).
     add 15166f8  IGNITE-14457 Update Ignite 3 binary build structure (#79)
     new a531baa  IGNITE-14389 Meta storage: in-memory implementation WIP
     new ea1a97a  IGNITE-14398: Meta storage: added update counter
     new b88359d  IGNITE-14389 Added get and do smth semantic

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (18b212f)
            \
             N -- N -- N   refs/heads/ignite-14389 (b88359d)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 assembly/NOTICE                                    |   0
 assembly/README                                    |   0
 assembly/assembly.xml                              |  50 ++++
 modules/cli-common/README.md                       |   5 +
 modules/cli/README.md                              |  32 +++
 modules/configuration-annotation-processor/pom.xml |  12 -
 .../processor/internal/Processor.java              |  31 +--
 .../configuration/processor/internal/Utils.java    | 109 +--------
 .../storage => }/ConfigurationChangerTest.java     |  28 ++-
 .../internal/util/ConfigurationUtilTest.java       |  82 ++++++-
 .../internal/validation/ValidationUtilTest.java    |   2 +-
 .../notifications/ConfigurationListenerTest.java   | 245 +++++++++++++++++++
 .../sample/LocalConfigurationSchema.java           |   2 +-
 .../sample/NetworkConfigurationSchema.java         |   2 +-
 .../ignite/configuration/sample/UsageTest.java     |  16 +-
 .../storage/TestConfigurationStorage.java          |  25 +-
 modules/configuration/pom.xml                      |  13 -
 .../ignite/configuration/ConfigurationChanger.java | 195 +++++++++------
 .../configuration/ConfigurationProperty.java       |  14 +-
 .../configuration/ConfigurationRegistry.java       |  55 ++++-
 .../ignite/configuration/ConfigurationTree.java    |   4 +-
 .../ignite/configuration/ConfigurationValue.java   |   6 +-
 .../configuration/NamedConfigurationTree.java      |   9 +-
 .../ignite/configuration/PropertyListener.java     |  61 -----
 .../configuration/internal/ConfigurationNode.java  |  19 +-
 .../internal/DynamicConfiguration.java             |   9 +-
 .../configuration/internal/DynamicProperty.java    |  16 +-
 .../internal/NamedListConfiguration.java           |  25 +-
 .../ConfigurationNotificationEventImpl.java}       |  43 ++--
 .../util/ConfigurationNotificationsUtil.java       | 184 ++++++++++++++
 .../internal/util/ConfigurationUtil.java           |  42 ++++
 .../notifications/ConfigurationListener.java}      |  20 +-
 .../ConfigurationNamedListListener.java}           |  26 +-
 .../ConfigurationNotificationEvent.java}           |  36 ++-
 .../storage/ConfigurationStorage.java              |  16 +-
 .../storage/ConfigurationStorageListener.java      |   1 +
 .../apache/ignite/configuration/storage/Data.java  |  22 +-
 .../ignite/configuration/tree/NamedListNode.java   |   5 +
 .../ignite/configuration/tree/NamedListView.java   |   5 +
 .../metastorage/server/KeyValueStorage.java        |  12 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 147 ++++++++----
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 235 ++++++++++++++++--
 modules/network/pom.xml                            |  12 -
 .../ITScaleCubeNetworkClusterMessagingTest.java    | 107 ++++++++-
 .../ignite/network/scalecube/TestMessage.java      |  16 +-
 .../scalecube/TestMessageMapperProvider.java       |  54 +++++
 .../scalecube/TestNetworkHandlersProvider.java     |   4 +-
 .../ignite/network/scalecube/TestRequest.java}     |  33 ++-
 .../scalecube/TestRequestMapperProvider.java       |  54 +++++
 .../ignite/network/scalecube/TestResponse.java}    |  34 ++-
 .../scalecube/TestResponseMapperProvider.java      |  54 +++++
 .../java/org/apache/ignite/network/Network.java    |  66 ++++++
 .../org/apache/ignite/network/NetworkCluster.java  |  25 +-
 .../ignite/network/NetworkClusterContext.java      |  57 +++++
 .../ignite/network/NetworkClusterFactory.java      |  56 +----
 .../network/NetworkConfigurationException.java}    |  17 +-
 ...rkMessageHandler.java => NetworkException.java} |  22 +-
 .../org/apache/ignite/network/NetworkMember.java   |   3 +-
 .../org/apache/ignite/network/NetworkMessage.java  |  59 -----
 .../ignite/network/NetworkMessageHandler.java      |   6 +-
 .../ignite/network/internal/MessageReader.java}    |  16 +-
 .../ignite/network/internal/MessageWriter.java}    |  16 +-
 .../ignite/network/message/AckResponse.java}       |  10 +-
 .../message/DefaultMessageMapperProvider.java      |  54 +++++
 .../network/message/MessageDeserializer.java}      |  19 +-
 .../network/message/MessageMapperProvider.java}    |  30 +--
 .../network/message/MessageMappingException.java}  |  17 +-
 .../ignite/network/message/MessageSerializer.java} |  28 +--
 .../ignite/network/message/NetworkMessage.java}    |  11 +-
 .../network/scalecube/ScaleCubeMessageCodec.java   | 133 +++++++++++
 .../network/scalecube/ScaleCubeMessageHandler.java |   8 +-
 .../network/scalecube/ScaleCubeMessageReader.java} |  26 +-
 .../network/scalecube/ScaleCubeMessageWriter.java} |  26 +-
 .../network/scalecube/ScaleCubeNetworkCluster.java |  37 ++-
 .../ScaleCubeNetworkClusterFactory.java}           |  71 ++++--
 .../org/apache/ignite/raft/client/Command.java     |   4 +-
 .../java/org/apache/ignite/raft/client/Peer.java   |   3 +-
 .../apache/ignite/raft/client/RaftErrorCode.java   |  10 +-
 .../ignite/raft/client/message/ActionRequest.java  |   4 +-
 .../ignite/raft/client/message/ActionResponse.java |   5 +-
 .../raft/client/message/AddLearnersRequest.java    |   4 +-
 .../raft/client/message/AddPeersRequest.java       |   4 +-
 .../raft/client/message/ChangePeersResponse.java   |   4 +-
 .../raft/client/message/GetLeaderRequest.java      |   5 +-
 .../raft/client/message/GetLeaderResponse.java     |   4 +-
 .../raft/client/message/GetPeersRequest.java       |   5 +-
 .../raft/client/message/GetPeersResponse.java      |   4 +-
 .../{impl => }/RaftClientMessageFactory.java       |  17 +-
 .../raft/client/message/RaftErrorResponse.java     |   4 +-
 .../raft/client/message/RemoveLearnersRequest.java |   4 +-
 .../raft/client/message/RemovePeersRequest.java    |   4 +-
 .../raft/client/message/SnapshotRequest.java       |   5 +-
 .../client/message/TransferLeadershipRequest.java  |   4 +-
 .../client/message/impl/ActionRequestImpl.java     |   5 +
 .../client/message/impl/ActionResponseImpl.java    |   5 +
 .../message/impl/AddLearnersRequestImpl.java       |   5 +
 .../client/message/impl/AddPeersRequestImpl.java   |   5 +
 .../message/impl/ChangePeersResponseImpl.java      |   5 +
 .../client/message/impl/GetLeaderRequestImpl.java  |   5 +
 .../client/message/impl/GetLeaderResponseImpl.java |   5 +
 .../client/message/impl/GetPeersRequestImpl.java   |   5 +
 .../client/message/impl/GetPeersResponseImpl.java  |   5 +
 .../message/impl/RaftClientMessageFactoryImpl.java |   1 +
 .../client/message/impl/RaftErrorResponseImpl.java |   5 +
 .../message/impl/RemoveLearnersRequestImpl.java    |   5 +
 .../message/impl/RemovePeersRequestImpl.java       |   5 +
 .../client/message/impl/SnapshotRequestImpl.java   |   5 +
 .../impl/TransferLeadershipRequestImpl.java        |   5 +
 .../CommandClosure.java}                           |  23 +-
 .../client/service/RaftGroupCommandListener.java   |   4 +-
 .../client/service/impl/RaftGroupServiceImpl.java  | 149 +++++-------
 .../raft/client/service/RaftGroupServiceTest.java  |  26 +-
 modules/{raft-client => raft}/pom.xml              |  21 +-
 .../ignite/raft/server/CounterCommandListener.java |  53 +++++
 .../ignite/raft/server/GetValueCommand.java}       |  10 +-
 .../raft/server/ITRaftCounterServerTest.java       | 186 +++++++++++++++
 .../raft/server/IncrementAndGetCommand.java}       |  26 +-
 .../org/apache/ignite/raft/server/RaftServer.java  |  56 +++++
 .../ignite/raft/server/impl/RaftServerImpl.java    | 264 +++++++++++++++++++++
 .../InMemoryConfigurationStorage.java              |  25 +-
 .../rest/presentation/json/JsonConverterTest.java  |   7 +
 .../json/TestConfigurationStorage.java             |  14 +-
 parent/pom.xml                                     |   7 -
 pom.xml                                            |  25 ++
 124 files changed, 3072 insertions(+), 1031 deletions(-)
 create mode 100644 assembly/NOTICE
 create mode 100644 assembly/README
 create mode 100644 assembly/assembly.xml
 create mode 100644 modules/cli-common/README.md
 create mode 100644 modules/cli/README.md
 rename modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/{sample/storage => }/ConfigurationChangerTest.java (87%)
 create mode 100644 modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/notifications/ConfigurationListenerTest.java
 rename modules/configuration-annotation-processor/src/test/java/org/apache/ignite/configuration/{sample => }/storage/TestConfigurationStorage.java (80%)
 delete mode 100644 modules/configuration/src/main/java/org/apache/ignite/configuration/PropertyListener.java
 copy modules/{raft-client/src/main/java/org/apache/ignite/raft/client/message/impl/ActionRequestImpl.java => configuration/src/main/java/org/apache/ignite/configuration/internal/notifications/ConfigurationNotificationEventImpl.java} (51%)
 create mode 100644 modules/configuration/src/main/java/org/apache/ignite/configuration/internal/util/ConfigurationNotificationsUtil.java
 copy modules/{schema/src/main/java/org/apache/ignite/internal/util/Factory.java => configuration/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationListener.java} (61%)
 copy modules/{metastorage-common/src/main/java/org/apache/ignite/metastorage/common/WatchListener.java => configuration/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNamedListListener.java} (53%)
 copy modules/{metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java => configuration/src/main/java/org/apache/ignite/configuration/notifications/ConfigurationNotificationEvent.java} (53%)
 create mode 100644 modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestMessageMapperProvider.java
 copy modules/{schema/src/main/java/org/apache/ignite/internal/schema/IndexColumnImpl.java => network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequest.java} (59%)
 create mode 100644 modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestRequestMapperProvider.java
 copy modules/{api/src/main/java/org/apache/ignite/schema/builder/HashIndexBuilder.java => network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponse.java} (58%)
 create mode 100644 modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestResponseMapperProvider.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/Network.java
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChangeException.java => network/src/main/java/org/apache/ignite/network/NetworkConfigurationException.java} (68%)
 copy modules/network/src/main/java/org/apache/ignite/network/{NetworkMessageHandler.java => NetworkException.java} (70%)
 delete mode 100644 modules/network/src/main/java/org/apache/ignite/network/NetworkMessage.java
 copy modules/{api/src/main/java/org/apache/ignite/binary/BinaryObject.java => network/src/main/java/org/apache/ignite/network/internal/MessageReader.java} (74%)
 copy modules/{api/src/main/java/org/apache/ignite/binary/BinaryObject.java => network/src/main/java/org/apache/ignite/network/internal/MessageWriter.java} (74%)
 copy modules/{api/src/main/java/org/apache/ignite/schema/HashIndex.java => network/src/main/java/org/apache/ignite/network/message/AckResponse.java} (76%)
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/message/DefaultMessageMapperProvider.java
 copy modules/{api/src/main/java/org/apache/ignite/schema/PrimaryIndex.java => network/src/main/java/org/apache/ignite/network/message/MessageDeserializer.java} (65%)
 copy modules/{api/src/main/java/org/apache/ignite/schema/Column.java => network/src/main/java/org/apache/ignite/network/message/MessageMapperProvider.java} (62%)
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/ConfigurationChangeException.java => network/src/main/java/org/apache/ignite/network/message/MessageMappingException.java} (67%)
 copy modules/{schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java => network/src/main/java/org/apache/ignite/network/message/MessageSerializer.java} (65%)
 copy modules/{api/src/main/java/org/apache/ignite/schema/SchemaObject.java => network/src/main/java/org/apache/ignite/network/message/NetworkMessage.java} (81%)
 create mode 100644 modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/validation/ValidationIssue.java => network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageReader.java} (65%)
 copy modules/{configuration/src/main/java/org/apache/ignite/configuration/validation/ValidationIssue.java => network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageWriter.java} (65%)
 copy modules/network/src/main/java/org/apache/ignite/network/{NetworkClusterFactory.java => scalecube/ScaleCubeNetworkClusterFactory.java} (50%)
 rename modules/raft-client/src/main/java/org/apache/ignite/raft/client/message/{impl => }/RaftClientMessageFactory.java (70%)
 copy modules/raft-client/src/main/java/org/apache/ignite/raft/client/{ElectionPriority.java => service/CommandClosure.java} (65%)
 copy modules/{raft-client => raft}/pom.xml (78%)
 create mode 100644 modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterCommandListener.java
 copy modules/{raft-client/src/main/java/org/apache/ignite/raft/client/WriteCommand.java => raft/src/integrationTest/java/org/apache/ignite/raft/server/GetValueCommand.java} (84%)
 create mode 100644 modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
 copy modules/{raft-client/src/main/java/org/apache/ignite/raft/client/WriteCommand.java => raft/src/integrationTest/java/org/apache/ignite/raft/server/IncrementAndGetCommand.java} (65%)
 create mode 100644 modules/raft/src/main/java/org/apache/ignite/raft/server/RaftServer.java
 create mode 100644 modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java

[ignite-3] 01/03: IGNITE-14389 Meta storage: in-memory implementation WIP

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a531baae20ecaaff9a06bb13be97feebd1a8a7db
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Mar 30 21:21:07 2021 +0300

    IGNITE-14389 Meta storage: in-memory implementation WIP
---
 modules/metastorage-server/pom.xml                 |  60 +++++
 .../ignite/internal/metastorage/server/Entry.java  | 146 +++++++++++
 .../metastorage/server/KeyValueStorage.java        |  28 +++
 .../server/SimpleInMemoryKeyValueStorage.java      | 267 ++++++++++++++++++++
 .../ignite/internal/metastorage/server/Watch.java  |  45 ++++
 .../internal/metastorage/server/Watcher.java       |  13 +
 .../internal/metastorage/server/WatcherImpl.java   |  58 +++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 274 +++++++++++++++++++++
 pom.xml                                            |   1 +
 9 files changed, 892 insertions(+)

diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
new file mode 100644
index 0000000..3c51fc5
--- /dev/null
+++ b/modules/metastorage-server/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>metastorage-server</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+
+        <!-- Test dependencies. -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
new file mode 100644
index 0000000..442aef9
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -0,0 +1,146 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a storage unit as entry with key, value and revision, where
+ * <ul>
+ *     <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li>
+ *     <ul>value - a data which is associated with a key and represented as an array of bytes.</ul>
+ *     <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul>
+ * </ul>
+ *
+ * Instance of {@link #Entry} could represents:
+ * <ul>
+ *     <li>A regular entry which stores a particular key, a value and a revision number.</li>
+ *     <li>An empty entry which denotes absence a regular entry in the meta storage for a given key.
+ *     A revision is 0 for such kind of entry.</li>
+ *     <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
+ * </ul>
+ */
+//TODO: Separate client and server entries. Empty and tombstone for client is the same.
+public class Entry {
+    /** Entry key. Couldn't be {@code null}. */
+    @NotNull
+    final private byte[] key;
+
+    /**
+     * Entry value.
+     * <p>
+     *     {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries.
+     * </p>
+     */
+    @Nullable
+    final private byte[] val;
+
+    /**
+     * Revision number corresponding to this particular entry.
+     * <p>
+     *     {@code rev == 0} for {@link #empty()} entry,
+     *     {@code rev > 0} for regular and {@link #tombstone()} entries.
+     * </p>
+     */
+    final private long rev;
+
+    /**
+     * Constructor.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param val Value bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
+    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+        assert key != null : "key can't be null";
+        assert val != null : "value can't be null";
+
+        this.key = key;
+        this.val = val;
+        this.rev = rev;
+    }
+
+    /**
+     * Constructor for empty and tombstone entries.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     */
+    private Entry(@NotNull byte[] key, long rev) {
+        assert key != null : "key can't be null";
+
+        this.key = key;
+        this.val = null;
+        this.rev = rev;
+    }
+
+    /**
+     * Creates an instance of empty entry for a given key.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry empty(byte[] key) {
+        return new Entry(key, 0);
+    }
+
+    /**
+     * Creates an instance of tombstone entry for a given key and a revision.
+     *
+     * @param key Key bytes. Couldn't be {@code null}.
+     * @return Empty entry.
+     */
+    @NotNull
+    public static Entry tombstone(byte[] key, long rev) {
+        assert rev > 0 : "rev must be positive for tombstone entry.";
+
+        return new Entry(key, rev);
+    }
+
+    /**
+     * Returns a key.
+     *
+     * @return Key.
+     */
+    @NotNull
+    public byte[] key() {
+        return key;
+    }
+
+    /**
+     * Returns a value.
+     *
+     * @return Value.
+     */
+    @Nullable
+    public byte[] value() {
+        return val;
+    }
+
+    /**
+     * Returns a revision.
+     * @return Revision.
+     */
+    public long revision() {
+        return rev;
+    }
+
+    /**
+     * Returns value which denotes whether entry is tombstone or not.
+     *
+     * @return {@code True} if entry is tombstone, otherwise - {@code false}.
+     */
+    public boolean tombstone() {
+        return val == null && rev > 0;
+    }
+
+    /**
+     * Returns value which denotes whether entry is empty or not.
+     *
+     * @return {@code True} if entry is empty, otherwise - {@code false}.
+     */
+    public boolean empty() {
+        return val == null && rev == 0;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
new file mode 100644
index 0000000..1bf6b78
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -0,0 +1,28 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Iterator;
+
+public interface KeyValueStorage {
+
+    long revision();
+
+    @NotNull
+    Entry put(byte[] key, byte[] value);
+
+    @NotNull
+    Entry get(byte[] key);
+
+    @NotNull
+    Entry get(byte[] key, long rev);
+
+    @NotNull
+    Entry remove(byte[] key);
+
+    Iterator<Entry> iterate(byte[] key);
+
+    //Iterator<Entry> iterate(long rev);
+
+    void compact();
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
new file mode 100644
index 0000000..9059aec
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -0,0 +1,267 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
+ */
+public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
+    private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
+
+    private static final byte[] TOMBSTONE = new byte[0];
+
+    private static final long LATEST_REV = -1;
+
+    private final Watcher watcher;
+
+    private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+    private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+
+    private long grev = 0;
+
+    private final Object mux = new Object();
+
+    public SimpleInMemoryKeyValueStorage(Watcher watcher) {
+        this.watcher = watcher;
+    }
+
+    @Override public long revision() {
+        return grev;
+    }
+
+    @NotNull
+    @Override public Entry put(byte[] key, byte[] val) {
+        synchronized (mux) {
+            long crev = ++grev;
+
+            // Update keysIdx.
+            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+            long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+            revs.add(crev);
+
+            // Update revsIdx.
+            NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            entries.put(key, val);
+
+            revsIdx.put(crev, entries);
+
+            // Return previous value.
+            if (lrev == 0)
+                return Entry.empty(key);
+
+            NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+
+            Entry res = new Entry(key, lastVal.get(key), lrev);
+
+            //TODO: notify watchers
+
+            return res;
+        }
+    }
+
+    @NotNull
+    @Override public Entry get(byte[] key) {
+        synchronized (mux) {
+            return doGet(key, LATEST_REV);
+        }
+    }
+
+    @NotNull
+    @TestOnly
+    @Override public Entry get(byte[] key, long rev) {
+        synchronized (mux) {
+            return doGet(key, rev);
+        }
+    }
+
+    @NotNull
+    @Override public Entry remove(byte[] key) {
+        synchronized (mux) {
+            Entry e = doGet(key, LATEST_REV);
+
+            if (e.value() == null)
+                return e;
+
+            return put(key, TOMBSTONE);
+        }
+    }
+
+    @Override public Iterator<Entry> iterate(byte[] keyFrom) {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> tailMap = keysIdx.tailMap(keyFrom, true);
+
+            final Iterator<Map.Entry<byte[], List<Long>>> it = tailMap.entrySet().iterator();
+
+            return new Iterator<>() {
+                private Map.Entry<byte[], List<Long>> curr;
+                private boolean hasNext;
+
+                private void advance() {
+                    if (it.hasNext()) {
+                        Map.Entry<byte[], List<Long>> e = it.next();
+
+                        byte[] key = e.getKey();
+
+                        if (!isPrefix(keyFrom, key))
+                            hasNext = false;
+                        else {
+                            curr = e;
+
+                            hasNext = true;
+                        }
+                    } else
+                        hasNext = false;
+                }
+
+                @Override
+                public boolean hasNext() {
+                    synchronized (mux) {
+                        if (curr == null)
+                            advance();
+
+                        return hasNext;
+                    }
+                }
+
+                @Override
+                public Entry next() {
+                    synchronized (mux) {
+                        if (!hasNext())
+                            throw new NoSuchElementException();
+
+                        Map.Entry<byte[], List<Long>> e = curr;
+
+                        curr = null;
+
+                        byte[] key = e.getKey();
+
+                        List<Long> revs = e.getValue();
+
+                        long rev = revs == null || revs.isEmpty() ? 0 : lastRevision(revs);
+
+                        if (rev == 0) {
+                            throw new IllegalStateException("rev == 0");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+
+                        if (vals == null || vals.isEmpty()) {
+                            throw new IllegalStateException("vals == null || vals.isEmpty()");
+                            //return new AbstractMap.SimpleImmutableEntry<>(key, null);
+                        }
+
+                        byte[] val = vals.get(key);
+
+                        return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+                    }
+                }
+            };
+        }
+    }
+
+    @Override public void compact() {
+        synchronized (mux) {
+            NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+
+            keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
+
+            keysIdx = compactedKeysIdx;
+
+            revsIdx = compactedRevsIdx;
+        }
+    }
+
+    private void compactForKey(
+            byte[] key,
+            List<Long> revs,
+            NavigableMap<byte[], List<Long>> compactedKeysIdx,
+            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+    ) {
+        Long lrev = lastRevision(revs);
+
+        NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+
+        byte[] lastVal = kv.get(key);
+
+        if (lastVal != TOMBSTONE) {
+            compactedKeysIdx.put(key, listOf(lrev));
+
+            NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+                    lrev,
+                    k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
+            );
+
+            compactedKv.put(key, lastVal);
+        }
+    }
+
+    /**
+     * Returns entry for given key.
+     *
+     * @param key Key.
+     * @param rev Revision.
+     * @return Entry for given key.
+     */
+    @NotNull private Entry doGet(byte[] key, long rev) {
+        List<Long> revs = keysIdx.get(key);
+
+        if (revs == null || revs.isEmpty())
+            return Entry.empty(key);
+
+        long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+
+        NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+
+        if (entries == null || entries.isEmpty())
+            return Entry.empty(key);
+
+        byte[] val = entries.get(key);
+
+        if (val == TOMBSTONE)
+            return Entry.tombstone(key, lrev);
+
+        return new Entry(key, val , lrev);
+    }
+
+    private static boolean isPrefix(byte[] pref, byte[] term) {
+        if (pref.length > term.length)
+            return false;
+
+        for (int i = 0; i < pref.length - 1; i++) {
+            if (pref[i] != term[i])
+                return false;
+        }
+
+        return true;
+    }
+
+    private static long lastRevision(List<Long> revs) {
+        return revs.get(revs.size() - 1);
+    }
+
+    private static  List<Long> listOf(long val) {
+        List<Long> res = new ArrayList<>();
+
+        res.add(val);
+
+        return res;
+    }
+
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
new file mode 100644
index 0000000..26cfa5c
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class Watch {
+    private static final Comparator<byte[]> CMP = Arrays::compare;
+
+    private static final long ANY_REVISION = -1;
+
+    @Nullable
+    private byte[] startKey;
+
+    @Nullable
+    private byte[] endKey;
+
+    long rev = ANY_REVISION;
+
+    public void startKey(byte[] startKey) {
+        this.startKey = startKey;
+    }
+
+    public void endKey(byte[] endKey) {
+        this.endKey = endKey;
+    }
+
+    public void revision(long rev) {
+        this.rev = rev;
+    }
+
+    public void notify(Entry e) {
+        if (startKey != null && CMP.compare(e.key(), startKey) < 0)
+            return;
+
+        if (endKey != null && CMP.compare(e.key(), endKey) > 0)
+            return;
+
+        if (rev != ANY_REVISION && e.revision() <= rev)
+            return;
+
+        System.out.println("Entry: key=" + new String(e.key()) + ", value=" + new String(e.value()) + ", rev=" + e.revision());
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
new file mode 100644
index 0000000..5516d06
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Watcher.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface Watcher {
+    void register(@NotNull Watch watch);
+
+    void notify(@NotNull Entry e);
+
+    //TODO: implement
+    void cancel(@NotNull Watch watch);
+}
+
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
new file mode 100644
index 0000000..dc126a0
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatcherImpl.java
@@ -0,0 +1,58 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class WatcherImpl implements Watcher {
+    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+    private final List<Watch> watches = new ArrayList<>();
+
+    private volatile boolean stop;
+
+    private final Object mux = new Object();
+
+    @Override public void register(@NotNull Watch watch) {
+        synchronized (mux) {
+            watches.add(watch);
+        }
+    }
+
+    @Override public void notify(@NotNull Entry e) {
+        queue.offer(e);
+    }
+
+    @Override
+    public void cancel(@NotNull Watch watch) {
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    public void shutdown() {
+        stop = true;
+    }
+
+    private class WatcherWorker implements Runnable {
+        @Override public void run() {
+            while (!stop) {
+                try {
+                    Entry e = queue.poll(100, TimeUnit.MILLISECONDS);
+
+                    if (e != null) {
+                        synchronized (mux) {
+                            watches.forEach(w -> w.notify(e));
+                        }
+                    }
+                }
+                catch (InterruptedException interruptedException) {
+                    // No-op.
+                }
+            }
+        }
+    }
+}
+
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
new file mode 100644
index 0000000..f7fb17e
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -0,0 +1,274 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SimpleInMemoryKeyValueStorageTest {
+    private KeyValueStorage storage;
+
+    @BeforeEach
+    public void setUp() {
+        storage = new SimpleInMemoryKeyValueStorage(new NoOpWatcher());
+    }
+
+    @Test
+    void putGetRemoveCompact() {
+        byte[] key1 = k(1);
+        byte[] val1_1 = kv(1, 1);
+        byte[] val1_3 = kv(1, 3);
+
+        byte[] key2 = k(2);
+        byte[] val2_2 = kv(2, 2);
+
+        assertEquals(0, storage.revision());
+
+        // Previous entry is empty.
+        Entry emptyEntry = storage.put(key1, val1_1);
+
+        assertEquals(1, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 1.
+        Entry e1_1 = storage.get(key1);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(1, storage.revision());
+
+        // Previous entry is empty.
+        emptyEntry = storage.put(key2, val2_2);
+
+        assertEquals(2, storage.revision());
+        assertTrue(emptyEntry.empty());
+
+        // Entry with rev == 2.
+        Entry e2 = storage.get(key2);
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(key2, e2.key());
+        assertArrayEquals(val2_2, e2.value());
+        assertEquals(2, e2.revision());
+        assertEquals(2, storage.revision());
+
+        // Previous entry is not empty.
+        e1_1 = storage.put(key1, val1_3);
+
+        assertFalse(e1_1.empty());
+        assertFalse(e1_1.tombstone());
+        assertArrayEquals(key1, e1_1.key());
+        assertArrayEquals(val1_1, e1_1.value());
+        assertEquals(1, e1_1.revision());
+        assertEquals(3, storage.revision());
+
+        // Entry with rev == 3.
+        Entry e1_3 = storage.get(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(3, storage.revision());
+
+        // Remove existing entry.
+        Entry e2_2 = storage.remove(key2);
+
+        assertFalse(e2_2.empty());
+        assertFalse(e2_2.tombstone());
+        assertArrayEquals(key2, e2_2.key());
+        assertArrayEquals(val2_2, e2_2.value());
+        assertEquals(2, e2_2.revision());
+        assertEquals(4, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        Entry tombstoneEntry = storage.remove(key2);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(4, storage.revision()); // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(4, storage.revision());
+        assertTrue(storage.remove(key2).empty());
+        assertTrue(storage.get(key2).empty());
+
+        // Remove existing entry.
+        e1_3 = storage.remove(key1);
+
+        assertFalse(e1_3.empty());
+        assertFalse(e1_3.tombstone());
+        assertArrayEquals(key1, e1_3.key());
+        assertArrayEquals(val1_3, e1_3.value());
+        assertEquals(3, e1_3.revision());
+        assertEquals(5, storage.revision()); // Storage revision is changed.
+
+        // Remove already removed entry.
+        tombstoneEntry = storage.remove(key1);
+
+        assertFalse(tombstoneEntry.empty());
+        assertTrue(tombstoneEntry.tombstone());
+        assertEquals(5, storage.revision()); // // Storage revision is not changed.
+
+        // Compact and check that tombstones are removed.
+        storage.compact();
+
+        assertEquals(5, storage.revision());
+        assertTrue(storage.remove(key1).empty());
+        assertTrue(storage.get(key1).empty());
+    }
+
+    @Test
+    void compact() {
+        assertEquals(0, storage.revision());
+
+        // Compact empty.
+        storage.compact();
+
+        assertEquals(0, storage.revision());
+
+        // Compact non-empty.
+        fill(storage, 1, 1);
+
+        assertEquals(1, storage.revision());
+
+        fill(storage, 2, 2);
+
+        assertEquals(3, storage.revision());
+
+        fill(storage, 3, 3);
+
+        assertEquals(6, storage.revision());
+
+        storage.remove(k(3));
+
+        assertEquals(7, storage.revision());
+        assertTrue(storage.get(k(3)).tombstone());
+
+        storage.compact();
+
+        assertEquals(7, storage.revision());
+
+        Entry e1 = storage.get(k(1));
+
+        assertFalse(e1.empty());
+        assertFalse(e1.tombstone());
+        assertArrayEquals(k(1), e1.key());
+        assertArrayEquals(kv(1,1), e1.value());
+        assertEquals(1, e1.revision());
+
+        Entry e2 = storage.get(k(2));
+
+        assertFalse(e2.empty());
+        assertFalse(e2.tombstone());
+        assertArrayEquals(k(2), e2.key());
+        assertArrayEquals(kv(2,2), e2.value());
+        assertTrue(storage.get(k(2), 2).empty());
+        assertEquals(3, e2.revision());
+
+        Entry e3 = storage.get(k(3));
+
+        assertTrue(e3.empty());
+        assertTrue(storage.get(k(3), 5).empty());
+        assertTrue(storage.get(k(3), 6).empty());
+        assertTrue(storage.get(k(3), 7).empty());
+    }
+
+    @Test
+    void iterate() {
+        TreeMap<String, String> expFooMap = new TreeMap<>();
+        TreeMap<String, String> expKeyMap = new TreeMap<>();
+        TreeMap<String, String> expZooMap = new TreeMap<>();
+
+        fill("foo", storage, expFooMap);
+        fill("key", storage, expKeyMap);
+        fill("zoo", storage, expZooMap);
+
+        assertEquals(300, storage.revision());
+
+        assertIterate("key", storage, expKeyMap);
+        assertIterate("zoo", storage, expZooMap);
+        assertIterate("foo", storage, expFooMap);
+    }
+
+    private void assertIterate(String pref,  KeyValueStorage storage, TreeMap<String, String> expMap) {
+        Iterator<Entry> it = storage.iterate((pref + "_").getBytes());
+        Iterator<Map.Entry<String, String>> expIt = expMap.entrySet().iterator();
+
+        // Order.
+        while (it.hasNext()) {
+            Entry entry = it.next();
+            Map.Entry<String, String> expEntry = expIt.next();
+
+            assertEquals(expEntry.getKey(), new String(entry.key()));
+            assertEquals(expEntry.getValue(), new String(entry.value()));
+        }
+
+        // Range boundaries.
+        it = storage.iterate((pref + '_').getBytes());
+
+        while (it.hasNext()) {
+            Entry entry = it.next();
+
+            assertTrue(expMap.containsKey(new String(entry.key())));
+        }
+    }
+
+    private static void fill(String pref, KeyValueStorage storage, TreeMap<String, String> expMap) {
+        for (int i = 0; i < 100; i++) {
+            String keyStr = pref + '_' + i;
+
+            String valStr = "val_" + i;
+
+            expMap.put(keyStr, valStr);
+
+            byte[] key = keyStr.getBytes();
+
+            byte[] val = valStr.getBytes();
+
+            storage.put(key, val);
+        }
+    }
+
+    private static void fill(KeyValueStorage storage, int keySuffix, int num) {
+        for (int i = 0; i < num; i++)
+            storage.put(k(keySuffix), kv(keySuffix, i + 1));
+    }
+
+    private static byte[] k(int k) {
+        return ("key" + k).getBytes();
+    }
+
+    private static byte[] kv(int k, int v) {
+        return ("key" + k + '_' + "val" + v).getBytes();
+    }
+
+    private static class NoOpWatcher implements Watcher {
+        @Override public void register(@NotNull Watch watch) {
+            // No-op.
+        }
+
+        @Override public void notify(@NotNull Entry e) {
+            // No-op.
+        }
+
+        @Override public void cancel(@NotNull Watch watch) {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4e87920..5557667 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
         <module>modules/core</module>
         <module>modules/metastorage-client</module>
         <module>modules/metastorage-common</module>
+        <module>modules/metastorage-server</module>
         <module>modules/network</module>
         <module>modules/raft</module>
         <module>modules/raft-client</module>

[ignite-3] 02/03: IGNITE-14398: Meta storage: added update counter

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ea1a97a96ae502ee87a0a912306fdd2f496d5195
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Mar 31 18:18:09 2021 +0300

    IGNITE-14398: Meta storage: added update counter
---
 .../ignite/internal/metastorage/server/Entry.java  | 37 +++++++++---
 .../metastorage/server/KeyValueStorage.java        |  2 +
 .../server/SimpleInMemoryKeyValueStorage.java      | 70 +++++++++++++---------
 .../ignite/internal/metastorage/server/Value.java  | 27 +++++++++
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 29 +++++++++
 5 files changed, 130 insertions(+), 35 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 442aef9..263a88b 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -44,20 +44,31 @@ public class Entry {
     final private long rev;
 
     /**
+     * Update counter corresponds to this particular entry.
+     * <p>
+     *     {@code updCntr == 0} for {@link #empty()} entry,
+     *     {@code updCntr > 0} for regular and {@link #tombstone()} entries.
+     * </p>
+     */
+    final private long updCntr;
+
+    /**
      * Constructor.
      *
      * @param key Key bytes. Couldn't be {@code null}.
      * @param val Value bytes. Couldn't be {@code null}.
      * @param rev Revision.
+     * @param updCntr Update counter.
      */
     // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
-    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
+    public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev, long updCntr) {
         assert key != null : "key can't be null";
         assert val != null : "value can't be null";
 
         this.key = key;
         this.val = val;
         this.rev = rev;
+        this.updCntr = updCntr;
     }
 
     /**
@@ -65,13 +76,15 @@ public class Entry {
      *
      * @param key Key bytes. Couldn't be {@code null}.
      * @param rev Revision.
+     * @param updCntr Update counter.
      */
-    private Entry(@NotNull byte[] key, long rev) {
+    private Entry(@NotNull byte[] key, long rev, long updCntr) {
         assert key != null : "key can't be null";
 
         this.key = key;
         this.val = null;
         this.rev = rev;
+        this.updCntr = updCntr;
     }
 
     /**
@@ -82,20 +95,22 @@ public class Entry {
      */
     @NotNull
     public static Entry empty(byte[] key) {
-        return new Entry(key, 0);
+        return new Entry(key, 0, 0);
     }
 
     /**
      * Creates an instance of tombstone entry for a given key and a revision.
      *
      * @param key Key bytes. Couldn't be {@code null}.
+     * @param rev Revision.
+     * @param updCntr Update counter.
      * @return Empty entry.
      */
     @NotNull
-    public static Entry tombstone(byte[] key, long rev) {
+    public static Entry tombstone(byte[] key, long rev, long updCntr) {
         assert rev > 0 : "rev must be positive for tombstone entry.";
 
-        return new Entry(key, rev);
+        return new Entry(key, rev, updCntr);
     }
 
     /**
@@ -127,12 +142,20 @@ public class Entry {
     }
 
     /**
+     * Returns a update counter.
+     * @return Update counter.
+     */
+    public long updateCounter() {
+        return updCntr;
+    }
+
+    /**
      * Returns value which denotes whether entry is tombstone or not.
      *
      * @return {@code True} if entry is tombstone, otherwise - {@code false}.
      */
     public boolean tombstone() {
-        return val == null && rev > 0;
+        return val == null && rev > 0 && updCntr > 0;
     }
 
     /**
@@ -141,6 +164,6 @@ public class Entry {
      * @return {@code True} if entry is empty, otherwise - {@code false}.
      */
     public boolean empty() {
-        return val == null && rev == 0;
+        return val == null && rev == 0 && updCntr == 0;
     }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 1bf6b78..e245e08 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -8,6 +8,8 @@ public interface KeyValueStorage {
 
     long revision();
 
+    long updateCounter();
+
     @NotNull
     Entry put(byte[] key, byte[] value);
 
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 9059aec..b764998 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -12,23 +12,25 @@ import java.util.TreeMap;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.TestOnly;
 
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+
 /**
  * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
  */
 public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     private static final Comparator<byte[]> LEXICOGRAPHIC_COMPARATOR = Arrays::compare;
 
-    private static final byte[] TOMBSTONE = new byte[0];
-
     private static final long LATEST_REV = -1;
 
     private final Watcher watcher;
 
     private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
-    private NavigableMap<Long, NavigableMap<byte[], byte[]>> revsIdx = new TreeMap<>();
+    private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
 
-    private long grev = 0;
+    private long rev;
+
+    private long updCntr;
 
     private final Object mux = new Object();
 
@@ -37,35 +39,45 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     }
 
     @Override public long revision() {
-        return grev;
+        return rev;
+    }
+
+    @Override public long updateCounter() {
+        return updCntr;
     }
 
     @NotNull
-    @Override public Entry put(byte[] key, byte[] val) {
+    @Override public Entry put(byte[] key, byte[] bytes) {
         synchronized (mux) {
-            long crev = ++grev;
+            long curRev = ++rev;
+
+            long curUpdCntr = ++updCntr;
 
             // Update keysIdx.
             List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
 
-            long lrev = revs.isEmpty() ? 0 : lastRevision(revs);
+            long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
 
-            revs.add(crev);
+            revs.add(curRev);
 
             // Update revsIdx.
-            NavigableMap<byte[], byte[]> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+            Value val = new Value(bytes, curUpdCntr);
 
             entries.put(key, val);
 
-            revsIdx.put(crev, entries);
+            revsIdx.put(curRev, entries);
 
             // Return previous value.
-            if (lrev == 0)
+            if (lastRev == 0)
                 return Entry.empty(key);
 
-            NavigableMap<byte[], byte[]> lastVal = revsIdx.get(lrev);
+            NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
+
+            Value lastVal = lastRevVals.get(key);
 
-            Entry res = new Entry(key, lastVal.get(key), lrev);
+            Entry res = new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
 
             //TODO: notify watchers
 
@@ -158,16 +170,18 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
                             //return new AbstractMap.SimpleImmutableEntry<>(key, null);
                         }
 
-                        NavigableMap<byte[], byte[]> vals = revsIdx.get(rev);
+                        NavigableMap<byte[], Value> vals = revsIdx.get(rev);
 
                         if (vals == null || vals.isEmpty()) {
                             throw new IllegalStateException("vals == null || vals.isEmpty()");
                             //return new AbstractMap.SimpleImmutableEntry<>(key, null);
                         }
 
-                        byte[] val = vals.get(key);
+                        Value val = vals.get(key);
 
-                        return val == TOMBSTONE ? Entry.tombstone(key, rev) : new Entry(key, val, rev);
+                        return val.tombstone() ?
+                                Entry.tombstone(key, rev, val.updateCounter()) :
+                                new Entry(key, val.bytes(), rev, val.updateCounter());
                     }
                 }
             };
@@ -178,7 +192,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         synchronized (mux) {
             NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
 
-            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx = new TreeMap<>();
+            NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx = new TreeMap<>();
 
             keysIdx.forEach((key, revs) -> compactForKey(key, revs, compactedKeysIdx, compactedRevsIdx));
 
@@ -192,18 +206,18 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
             byte[] key,
             List<Long> revs,
             NavigableMap<byte[], List<Long>> compactedKeysIdx,
-            NavigableMap<Long, NavigableMap<byte[], byte[]>> compactedRevsIdx
+            NavigableMap<Long, NavigableMap<byte[], Value>> compactedRevsIdx
     ) {
         Long lrev = lastRevision(revs);
 
-        NavigableMap<byte[], byte[]> kv = revsIdx.get(lrev);
+        NavigableMap<byte[], Value> kv = revsIdx.get(lrev);
 
-        byte[] lastVal = kv.get(key);
+        Value lastVal = kv.get(key);
 
-        if (lastVal != TOMBSTONE) {
+        if (!lastVal.tombstone()) {
             compactedKeysIdx.put(key, listOf(lrev));
 
-            NavigableMap<byte[], byte[]> compactedKv = compactedRevsIdx.computeIfAbsent(
+            NavigableMap<byte[], Value> compactedKv = compactedRevsIdx.computeIfAbsent(
                     lrev,
                     k -> new TreeMap<>(LEXICOGRAPHIC_COMPARATOR)
             );
@@ -227,17 +241,17 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
 
         long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
 
-        NavigableMap<byte[], byte[]> entries = revsIdx.get(lrev);
+        NavigableMap<byte[], Value> entries = revsIdx.get(lrev);
 
         if (entries == null || entries.isEmpty())
             return Entry.empty(key);
 
-        byte[] val = entries.get(key);
+        Value val = entries.get(key);
 
-        if (val == TOMBSTONE)
-            return Entry.tombstone(key, lrev);
+        if (val.tombstone())
+            return Entry.tombstone(key, lrev, val.updateCounter());
 
-        return new Entry(key, val , lrev);
+        return new Entry(key, val.bytes() , lrev, val.updateCounter());
     }
 
     private static boolean isPrefix(byte[] pref, byte[] term) {
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
new file mode 100644
index 0000000..250a5ea
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -0,0 +1,27 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.jetbrains.annotations.NotNull;
+
+public class Value {
+    public static final byte[] TOMBSTONE = new byte[0];
+
+    private final byte[] bytes;
+    private final long updCntr;
+
+    public Value(@NotNull byte[] bytes, long updCntr) {
+        this.bytes = bytes;
+        this.updCntr = updCntr;
+    }
+
+    public byte[] bytes() {
+        return bytes;
+    }
+
+    public long updateCounter() {
+        return updCntr;
+    }
+
+    boolean tombstone() {
+        return bytes == TOMBSTONE;
+    }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index f7fb17e..eae76fd 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -31,11 +31,13 @@ class SimpleInMemoryKeyValueStorageTest {
         byte[] val2_2 = kv(2, 2);
 
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Previous entry is empty.
         Entry emptyEntry = storage.put(key1, val1_1);
 
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
         assertTrue(emptyEntry.empty());
 
         // Entry with rev == 1.
@@ -46,12 +48,15 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_1.key());
         assertArrayEquals(val1_1, e1_1.value());
         assertEquals(1, e1_1.revision());
+        assertEquals(1, e1_1.updateCounter());
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
 
         // Previous entry is empty.
         emptyEntry = storage.put(key2, val2_2);
 
         assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
         assertTrue(emptyEntry.empty());
 
         // Entry with rev == 2.
@@ -62,7 +67,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key2, e2.key());
         assertArrayEquals(val2_2, e2.value());
         assertEquals(2, e2.revision());
+        assertEquals(2, e2.updateCounter());
         assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
 
         // Previous entry is not empty.
         e1_1 = storage.put(key1, val1_3);
@@ -72,7 +79,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_1.key());
         assertArrayEquals(val1_1, e1_1.value());
         assertEquals(1, e1_1.revision());
+        assertEquals(1, e1_1.updateCounter());
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         // Entry with rev == 3.
         Entry e1_3 = storage.get(key1);
@@ -82,7 +91,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_3.key());
         assertArrayEquals(val1_3, e1_3.value());
         assertEquals(3, e1_3.revision());
+        assertEquals(3, e1_3.updateCounter());
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         // Remove existing entry.
         Entry e2_2 = storage.remove(key2);
@@ -92,7 +103,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key2, e2_2.key());
         assertArrayEquals(val2_2, e2_2.value());
         assertEquals(2, e2_2.revision());
+        assertEquals(2, e2_2.updateCounter());
         assertEquals(4, storage.revision()); // Storage revision is changed.
+        assertEquals(4, storage.updateCounter());
 
         // Remove already removed entry.
         Entry tombstoneEntry = storage.remove(key2);
@@ -100,11 +113,13 @@ class SimpleInMemoryKeyValueStorageTest {
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
         assertEquals(4, storage.revision()); // Storage revision is not changed.
+        assertEquals(4, storage.updateCounter());
 
         // Compact and check that tombstones are removed.
         storage.compact();
 
         assertEquals(4, storage.revision());
+        assertEquals(4, storage.updateCounter());
         assertTrue(storage.remove(key2).empty());
         assertTrue(storage.get(key2).empty());
 
@@ -116,7 +131,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, e1_3.key());
         assertArrayEquals(val1_3, e1_3.value());
         assertEquals(3, e1_3.revision());
+        assertEquals(3, e1_3.updateCounter());
         assertEquals(5, storage.revision()); // Storage revision is changed.
+        assertEquals(5, storage.updateCounter());
 
         // Remove already removed entry.
         tombstoneEntry = storage.remove(key1);
@@ -124,11 +141,13 @@ class SimpleInMemoryKeyValueStorageTest {
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
         assertEquals(5, storage.revision()); // // Storage revision is not changed.
+        assertEquals(5, storage.updateCounter());
 
         // Compact and check that tombstones are removed.
         storage.compact();
 
         assertEquals(5, storage.revision());
+        assertEquals(5, storage.updateCounter());
         assertTrue(storage.remove(key1).empty());
         assertTrue(storage.get(key1).empty());
     }
@@ -136,33 +155,40 @@ class SimpleInMemoryKeyValueStorageTest {
     @Test
     void compact() {
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Compact empty.
         storage.compact();
 
         assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
 
         // Compact non-empty.
         fill(storage, 1, 1);
 
         assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
 
         fill(storage, 2, 2);
 
         assertEquals(3, storage.revision());
+        assertEquals(3, storage.updateCounter());
 
         fill(storage, 3, 3);
 
         assertEquals(6, storage.revision());
+        assertEquals(6, storage.updateCounter());
 
         storage.remove(k(3));
 
         assertEquals(7, storage.revision());
+        assertEquals(7, storage.updateCounter());
         assertTrue(storage.get(k(3)).tombstone());
 
         storage.compact();
 
         assertEquals(7, storage.revision());
+        assertEquals(7, storage.updateCounter());
 
         Entry e1 = storage.get(k(1));
 
@@ -171,6 +197,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(k(1), e1.key());
         assertArrayEquals(kv(1,1), e1.value());
         assertEquals(1, e1.revision());
+        assertEquals(1, e1.updateCounter());
 
         Entry e2 = storage.get(k(2));
 
@@ -180,6 +207,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(kv(2,2), e2.value());
         assertTrue(storage.get(k(2), 2).empty());
         assertEquals(3, e2.revision());
+        assertEquals(3, e2.updateCounter());
 
         Entry e3 = storage.get(k(3));
 
@@ -200,6 +228,7 @@ class SimpleInMemoryKeyValueStorageTest {
         fill("zoo", storage, expZooMap);
 
         assertEquals(300, storage.revision());
+        assertEquals(300, storage.updateCounter());
 
         assertIterate("key", storage, expKeyMap);
         assertIterate("zoo", storage, expZooMap);

[ignite-3] 03/03: IGNITE-14389 Added get and do smth semantic

Posted by ag...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b88359dafab6c2ecb15ee9114a37976a46d6b5e4
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue Apr 6 22:25:03 2021 +0300

    IGNITE-14389 Added get and do smth semantic
---
 .../metastorage/server/KeyValueStorage.java        |  12 +-
 .../server/SimpleInMemoryKeyValueStorage.java      | 147 +++++++++----
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 235 +++++++++++++++++++--
 3 files changed, 329 insertions(+), 65 deletions(-)

diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index e245e08..ead1043 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -11,16 +11,20 @@ public interface KeyValueStorage {
     long updateCounter();
 
     @NotNull
-    Entry put(byte[] key, byte[] value);
-
-    @NotNull
     Entry get(byte[] key);
 
     @NotNull
     Entry get(byte[] key, long rev);
 
+    void put(byte[] key, byte[] value);
+
+    @NotNull
+    Entry getAndPut(byte[] key, byte[] value);
+
+    void remove(byte[] key);
+
     @NotNull
-    Entry remove(byte[] key);
+    Entry getAndRemove(byte[] key);
 
     Iterator<Entry> iterate(byte[] key);
 
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index b764998..3700f4a 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -46,49 +46,26 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         return updCntr;
     }
 
-    @NotNull
-    @Override public Entry put(byte[] key, byte[] bytes) {
+    @Override public void put(byte[] key, byte[] value) {
         synchronized (mux) {
-            long curRev = ++rev;
-
-            long curUpdCntr = ++updCntr;
-
-            // Update keysIdx.
-            List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
-
-            long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
-
-            revs.add(curRev);
-
-            // Update revsIdx.
-            NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
-
-            Value val = new Value(bytes, curUpdCntr);
-
-            entries.put(key, val);
+            doPut(key, value);
+        }
+    }
 
-            revsIdx.put(curRev, entries);
+    @NotNull
+    @Override public Entry getAndPut(byte[] key, byte[] bytes) {
+        synchronized (mux) {
+            long lastRev = doPut(key, bytes);
 
             // Return previous value.
-            if (lastRev == 0)
-                return Entry.empty(key);
-
-            NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
-
-            Value lastVal = lastRevVals.get(key);
-
-            Entry res = new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
-
-            //TODO: notify watchers
-
-            return res;
+            return doGetValue(key, lastRev);
         }
     }
 
     @NotNull
     @Override public Entry get(byte[] key) {
         synchronized (mux) {
-            return doGet(key, LATEST_REV);
+            return doGet(key, LATEST_REV, false);
         }
     }
 
@@ -96,19 +73,31 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     @TestOnly
     @Override public Entry get(byte[] key, long rev) {
         synchronized (mux) {
-            return doGet(key, rev);
+            return doGet(key, rev, true);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        synchronized (mux) {
+            Entry e = doGet(key, LATEST_REV, false);
+
+            if (e.empty() || e.tombstone())
+                return;
+
+            doPut(key, TOMBSTONE);
         }
     }
 
     @NotNull
-    @Override public Entry remove(byte[] key) {
+    @Override public Entry getAndRemove(byte[] key) {
         synchronized (mux) {
-            Entry e = doGet(key, LATEST_REV);
+            Entry e = doGet(key, LATEST_REV, false);
 
-            if (e.value() == null)
+            if (e.empty() || e.tombstone())
                 return e;
 
-            return put(key, TOMBSTONE);
+            return getAndPut(key, TOMBSTONE);
         }
     }
 
@@ -233,25 +222,91 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
      * @param rev Revision.
      * @return Entry for given key.
      */
-    @NotNull private Entry doGet(byte[] key, long rev) {
+    @NotNull
+    private Entry doGet(byte[] key, long rev, boolean exactRev) {
+        assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
+                "Invalid arguments: [rev=" + rev + ", exactRev=" + exactRev + ']';
+
         List<Long> revs = keysIdx.get(key);
 
         if (revs == null || revs.isEmpty())
             return Entry.empty(key);
 
-        long lrev = rev == LATEST_REV ? lastRevision(revs) : rev;
+        long lastRev;
 
-        NavigableMap<byte[], Value> entries = revsIdx.get(lrev);
+        if (rev == LATEST_REV)
+            lastRev = lastRevision(revs);
+        else
+            lastRev = exactRev ? rev : maxRevision(revs, rev);
 
-        if (entries == null || entries.isEmpty())
+        // lastRev can be -1 if maxRevision return -1.
+        if (lastRev == -1)
             return Entry.empty(key);
 
-        Value val = entries.get(key);
+        return doGetValue(key, lastRev);
+    }
+
+    /**
+     * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then
+     * {@code -1} will be returned.
+     *
+     * @param revs Revisions list.
+     * @param upperBoundRev Revision upper bound.
+     * @return Appropriate revision or {@code -1} if there is no such revision.
+     */
+    private static long maxRevision(List<Long> revs, long upperBoundRev) {
+        int i = revs.size() - 1;
+
+        for (; i >= 0 ; i--) {
+            long rev = revs.get(i);
+
+            if (rev <= upperBoundRev)
+                return rev;
+        }
+
+        return -1;
+    }
+
+    @NotNull
+    private Entry doGetValue(byte[] key, long lastRev) {
+        if (lastRev == 0)
+            return Entry.empty(key);
+
+        NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
+
+        if (lastRevVals == null || lastRevVals.isEmpty())
+            return Entry.empty(key);
+
+        Value lastVal = lastRevVals.get(key);
+
+        if (lastVal.tombstone())
+            return Entry.tombstone(key, lastRev, lastVal.updateCounter());
+
+        return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter());
+    }
+
+    private long doPut(byte[] key, byte[] bytes) {
+        long curRev = ++rev;
+
+        long curUpdCntr = ++updCntr;
+
+        // Update keysIdx.
+        List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
+
+        long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
+
+        revs.add(curRev);
+
+        // Update revsIdx.
+        NavigableMap<byte[], Value> entries = new TreeMap<>(LEXICOGRAPHIC_COMPARATOR);
+
+        Value val = new Value(bytes, curUpdCntr);
+
+        entries.put(key, val);
 
-        if (val.tombstone())
-            return Entry.tombstone(key, lrev, val.updateCounter());
+        revsIdx.put(curRev, entries);
 
-        return new Entry(key, val.bytes() , lrev, val.updateCounter());
+        return lastRev;
     }
 
     private static boolean isPrefix(byte[] pref, byte[] term) {
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index eae76fd..5b797fc 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -22,7 +22,212 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    void putGetRemoveCompact() {
+    public void put() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        Entry e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+
+        storage.put(key, val);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAndPut() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        Entry e = storage.getAndPut(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+        assertTrue(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(0, e.revision());
+        assertEquals(0, e.updateCounter());
+
+        e = storage.getAndPut(key, val);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+    }
+
+    @Test
+    public void remove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        // Remove non-existent entry.
+        storage.remove(key);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // Remove existent entry.
+        storage.remove(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        Entry e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+
+        // Remove already removed entry (tombstone can't be removed).
+        storage.remove(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAndRemove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        // Remove non-existent entry.
+        Entry e = storage.getAndRemove(key);
+
+        assertTrue(e.empty());
+        assertEquals(0, storage.revision());
+        assertEquals(0, storage.updateCounter());
+        assertTrue(storage.get(key).empty());
+
+        storage.put(key, val);
+
+        assertEquals(1, storage.revision());
+        assertEquals(1, storage.updateCounter());
+
+        // Remove existent entry.
+        e = storage.getAndRemove(key);
+
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+        assertEquals(1, e.revision());
+        assertEquals(1, e.updateCounter());
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+
+        // Remove already removed entry (tombstone can't be removed).
+        e = storage.getAndRemove(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+
+        e = storage.get(key);
+
+        assertFalse(e.empty());
+        assertTrue(e.tombstone());
+        assertEquals(2, e.revision());
+        assertEquals(2, e.updateCounter());
+    }
+
+    @Test
+    public void getAfterRemove() {
+        byte[] key = k(1);
+        byte[] val = kv(1, 1);
+
+        storage.getAndPut(key, val);
+
+        storage.getAndRemove(key);
+
+        Entry e = storage.get(key);
+
+        assertEquals(2, storage.revision());
+        assertEquals(2, storage.updateCounter());
+        assertEquals(2, e.revision());
+        assertTrue(e.tombstone());
+    }
+
+    @Test
+    public void getAndPutAfterRemove() {
+        byte[] key = k(1);
+
+        byte[] val = kv(1, 1);
+
+        storage.getAndPut(key, val);
+
+        storage.getAndRemove(key);
+
+        Entry e = storage.getAndPut(key, val);
+
+        assertEquals(3, storage.revision());
+
+        assertEquals(3, storage.updateCounter());
+
+        assertEquals(2, e.revision());
+
+        assertTrue(e.tombstone());
+    }
+
+    @Test
+    public void putGetRemoveCompact() {
         byte[] key1 = k(1);
         byte[] val1_1 = kv(1, 1);
         byte[] val1_3 = kv(1, 3);
@@ -34,7 +239,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(0, storage.updateCounter());
 
         // Previous entry is empty.
-        Entry emptyEntry = storage.put(key1, val1_1);
+        Entry emptyEntry = storage.getAndPut(key1, val1_1);
 
         assertEquals(1, storage.revision());
         assertEquals(1, storage.updateCounter());
@@ -53,7 +258,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         // Previous entry is empty.
-        emptyEntry = storage.put(key2, val2_2);
+        emptyEntry = storage.getAndPut(key2, val2_2);
 
         assertEquals(2, storage.revision());
         assertEquals(2, storage.updateCounter());
@@ -72,7 +277,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(2, storage.updateCounter());
 
         // Previous entry is not empty.
-        e1_1 = storage.put(key1, val1_3);
+        e1_1 = storage.getAndPut(key1, val1_3);
 
         assertFalse(e1_1.empty());
         assertFalse(e1_1.tombstone());
@@ -96,7 +301,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(3, storage.updateCounter());
 
         // Remove existing entry.
-        Entry e2_2 = storage.remove(key2);
+        Entry e2_2 = storage.getAndRemove(key2);
 
         assertFalse(e2_2.empty());
         assertFalse(e2_2.tombstone());
@@ -108,7 +313,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(4, storage.updateCounter());
 
         // Remove already removed entry.
-        Entry tombstoneEntry = storage.remove(key2);
+        Entry tombstoneEntry = storage.getAndRemove(key2);
 
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
@@ -120,11 +325,11 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(4, storage.revision());
         assertEquals(4, storage.updateCounter());
-        assertTrue(storage.remove(key2).empty());
+        assertTrue(storage.getAndRemove(key2).empty());
         assertTrue(storage.get(key2).empty());
 
         // Remove existing entry.
-        e1_3 = storage.remove(key1);
+        e1_3 = storage.getAndRemove(key1);
 
         assertFalse(e1_3.empty());
         assertFalse(e1_3.tombstone());
@@ -136,7 +341,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(5, storage.updateCounter());
 
         // Remove already removed entry.
-        tombstoneEntry = storage.remove(key1);
+        tombstoneEntry = storage.getAndRemove(key1);
 
         assertFalse(tombstoneEntry.empty());
         assertTrue(tombstoneEntry.tombstone());
@@ -148,12 +353,12 @@ class SimpleInMemoryKeyValueStorageTest {
 
         assertEquals(5, storage.revision());
         assertEquals(5, storage.updateCounter());
-        assertTrue(storage.remove(key1).empty());
+        assertTrue(storage.getAndRemove(key1).empty());
         assertTrue(storage.get(key1).empty());
     }
 
     @Test
-    void compact() {
+    public void compact() {
         assertEquals(0, storage.revision());
         assertEquals(0, storage.updateCounter());
 
@@ -179,7 +384,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(6, storage.revision());
         assertEquals(6, storage.updateCounter());
 
-        storage.remove(k(3));
+        storage.getAndRemove(k(3));
 
         assertEquals(7, storage.revision());
         assertEquals(7, storage.updateCounter());
@@ -218,7 +423,7 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    void iterate() {
+    public void iterate() {
         TreeMap<String, String> expFooMap = new TreeMap<>();
         TreeMap<String, String> expKeyMap = new TreeMap<>();
         TreeMap<String, String> expZooMap = new TreeMap<>();
@@ -270,13 +475,13 @@ class SimpleInMemoryKeyValueStorageTest {
 
             byte[] val = valStr.getBytes();
 
-            storage.put(key, val);
+            storage.getAndPut(key, val);
         }
     }
 
     private static void fill(KeyValueStorage storage, int keySuffix, int num) {
         for (int i = 0; i < num; i++)
-            storage.put(k(keySuffix), kv(keySuffix, i + 1));
+            storage.getAndPut(k(keySuffix), kv(keySuffix, i + 1));
     }
 
     private static byte[] k(int k) {