You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/27 11:30:40 UTC

[ignite-3] branch main updated: IGNITE-14407 introduced in-memory vault implementation. Fixes #105

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9af1037  IGNITE-14407 introduced in-memory vault implementation. Fixes #105
9af1037 is described below

commit 9af10372881d2979a06ea616d9c5dd3ffdfa5042
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Tue Apr 27 14:30:10 2021 +0300

    IGNITE-14407 introduced in-memory vault implementation. Fixes #105
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/internal/affinity/AffinityManager.java  |   8 +-
 .../affinity/RendezvousAffinityFunctionTest.java   |   6 +-
 .../org/apache/ignite/internal/util/ByteUtils.java | 134 ++++++++++++
 .../apache/ignite/internal/util/IgniteUtils.java   |  51 -----
 .../java/org/apache/ignite/lang/ByteArray.java     | 106 +++++++++
 .../apache/ignite/internal/app/IgnitionImpl.java   |   3 +-
 .../internal/table/distributed/TableManager.java   |   8 +-
 modules/vault/README.md                            |   4 +
 modules/vault/pom.xml                              |  12 ++
 .../apache/ignite/internal/vault/VaultManager.java | 147 ++++++++++++-
 .../vault/common/{VaultEntry.java => Entry.java}   |  32 +--
 .../VaultListener.java}                            |  33 ++-
 .../ignite/internal/vault/common/VaultWatch.java   |  85 ++++++++
 .../{VaultManager.java => common/Watcher.java}     |  36 ++--
 .../ignite/internal/vault/common/WatcherImpl.java  | 151 +++++++++++++
 .../internal/vault/impl/VaultServiceImpl.java      | 114 ++++++++++
 .../internal/vault/service/VaultService.java       |  90 ++++++++
 .../vault/impl/VaultBaseContractsTest.java         | 236 +++++++++++++++++++++
 18 files changed, 1142 insertions(+), 114 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 16bca40..d09094e 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -28,8 +28,9 @@ import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.metastorage.common.Conditions;
 import org.apache.ignite.metastorage.common.Key;
@@ -141,8 +142,7 @@ public class AffinityManager {
                         UUID tblId = UUID.fromString(placeholderValue);
 
                         try {
-                            String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
-                                .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+                            String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
 
                             int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                                 .tables().get(name).partitions().value();
@@ -151,7 +151,7 @@ public class AffinityManager {
 
                             metaStorageMgr.invoke(evt.newEntry().key(),
                                 Conditions.value().eq(evt.newEntry().value()),
-                                Operations.put(IgniteUtils.toBytes(
+                                Operations.put(ByteUtils.toBytes(
                                     RendezvousAffinityFunction.assignPartitions(
                                         baselineMgr.nodes(),
                                         partitions,
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 2dd46df..529c592 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.function.Function;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
@@ -125,13 +125,13 @@ public class RendezvousAffinityFunctionTest {
             null
         );
 
-        byte[] assignmentBytes = IgniteUtils.toBytes(assignment);
+        byte[] assignmentBytes = ByteUtils.toBytes(assignment);
 
         assertNotNull(assignment);
 
         LOG.info("Assignment is serialized successfully [bytes=" + assignmentBytes.length + ']');
 
-        List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(assignmentBytes);
+        List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentBytes);
 
         assertNotNull(deserializedAssignment);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
new file mode 100644
index 0000000..aefd419
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Utility class provides various method for manipulating with bytes.
+ */
+public class ByteUtils {
+    /** The logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(ByteUtils.class);
+
+    /**
+     * Constructs {@code long} from byte array.
+     *
+     * @param bytes Array of bytes.
+     * @param off Offset in {@code bytes} array.
+     * @return Long value.
+     */
+    public static long bytesToLong(byte[] bytes, int off) {
+        assert bytes != null;
+
+        int bytesCnt = Long.SIZE >> 3;
+
+        if (off + bytesCnt > bytes.length)
+            bytesCnt = bytes.length - off;
+
+        long res = 0;
+
+        for (int i = 0; i < bytesCnt; i++) {
+            int shift = bytesCnt - i - 1 << 3;
+
+            res |= (0xffL & bytes[off++]) << shift;
+        }
+
+        return res;
+    }
+
+    /**
+     * Converts primitive {@code long} type to byte array.
+     *
+     * @param l Long value.
+     * @return Array of bytes.
+     */
+    public static byte[] longToBytes(long l) {
+        return toBytes(l, new byte[8], 0, 8);
+    }
+
+    /**
+     * Converts primitive {@code long} type to byte array and stores it in specified
+     * byte array. The highest byte in the value is the first byte in result array.
+     *
+     * @param l Unsigned long value.
+     * @param bytes Bytes array to write result to.
+     * @param off Offset in the target array to write result to.
+     * @param limit Limit of bytes to write into output.
+     * @return Number of bytes overwritten in {@code bytes} array.
+     */
+    private static byte[] toBytes(long l, byte[] bytes, int off, int limit) {
+        assert bytes != null;
+        assert limit <= 8;
+        assert bytes.length >= off + limit;
+
+        for (int i = limit - 1; i >= 0; i--) {
+            bytes[off + i] = (byte)(l & 0xFF);
+            l >>>= 8;
+        }
+
+        return bytes;
+    }
+
+    /**
+     * Serializes an object to byte array using native java serialization mechanism.
+     *
+     * @param obj Object to serialize.
+     * @return Byte array.
+     */
+    public static byte[] toBytes(Object obj) {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+            try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+                out.writeObject(obj);
+
+                out.flush();
+
+                return bos.toByteArray();
+            }
+        }
+        catch (Exception e) {
+            LOG.warn("Could not serialize a class [cls=" + obj.getClass().getName() + "]", e);
+
+            return null;
+        }
+    }
+
+    /**
+     * Deserializes an object from byte array using native java serialization mechanism.
+     *
+     * @param bytes Byte array.
+     * @return Object.
+     */
+    public static Object fromBytes(byte[] bytes) {
+        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
+            try (ObjectInputStream in = new ObjectInputStream(bis)) {
+                return in.readObject();
+            }
+        }
+        catch (Exception e) {
+            LOG.warn("Could not deserialize an object", e);
+
+            return null;
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index a490af3..60d637b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,21 +17,13 @@
 
 package org.apache.ignite.internal.util;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import org.apache.ignite.lang.IgniteLogger;
 
 /**
  * Collection of utility methods used throughout the system.
  */
 public class IgniteUtils {
-    /** The logger. */
-    private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteUtils.class);
-
     /** Version of the JDK. */
     private static String jdkVer;
 
@@ -175,47 +167,4 @@ public class IgniteUtils {
 
         return hash(val);
     }
-
-    /**
-     * Serializes an object to byte array using native java serialization mechanism.
-     *
-     * @param obj Object to serialize.
-     * @return Byte array.
-     */
-    public static byte[] toBytes(Object obj) {
-        try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
-            try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
-                out.writeObject(obj);
-
-                out.flush();
-
-                return bos.toByteArray();
-            }
-        }
-        catch (Exception e) {
-            LOG.warn("Could not serialize a class [cls=" + obj.getClass().getName() + "]", e);
-
-            return null;
-        }
-    }
-
-    /**
-     * Deserializes an object from byte array using native java serialization mechanism.
-     *
-     * @param bytes Byte array.
-     * @return Object.
-     */
-    public static Object fromBytes(byte[] bytes) {
-        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
-            try (ObjectInputStream in = new ObjectInputStream(bis)) {
-                return in.readObject();
-            }
-        }
-        catch (Exception e) {
-            LOG.warn("Could not deserialize an object", e);
-
-            return null;
-        }
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
new file mode 100644
index 0000000..c38be87
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A class for handling byte array.
+ */
+public final class ByteArray implements Comparable<ByteArray> {
+    /** Byte-wise representation of the {@code ByteArray}. */
+    @NotNull
+    private final byte[] arr;
+
+    /**
+     * Constructs {@code ByteArray} instance from the given byte array. <em>Note:</em> copy of the given byte array will not be
+     * created in order to avoid redundant memory consumption.
+     *
+     * @param arr Byte array. Can't be {@code null}.
+     */
+    public ByteArray(@NotNull byte[] arr) {
+        this.arr = arr;
+    }
+
+    /**
+     * Constructs {@code ByteArray} instance from the given string. {@link StandardCharsets#UTF_8} charset is used for
+     * encoding the input string.
+     *
+     * @param s The string {@code ByteArray} representation. Can't be {@code null}.
+     * @return {@code ByteArray} instance from the given string.
+     */
+    public static ByteArray fromString(@NotNull String s) {
+        return new ByteArray(s.getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Returns the {@code ByteArray} as byte array.
+     *
+     * @return Bytes of the {@code ByteArray}.
+     */
+    public byte[] bytes() {
+        return arr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ByteArray byteArray = (ByteArray)o;
+
+        return Arrays.equals(arr, byteArray.arr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Arrays.hashCode(arr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(@NotNull ByteArray other) {
+        return Arrays.compare(this.arr, other.arr);
+    }
+
+    /**
+     * Compares two {@code ByteArray} values. The value returned is identical to what would be returned by:
+     * <pre>
+     *    x.compareTo(y)
+     * </pre>
+     * <p>
+     * where x and y are {@code ByteArray}'s
+     *
+     * @param x The first {@code ByteArray} to compare.
+     * @param y The second {@code ByteArray} to compare.
+     * @return the value {@code 0} if the first and second {@code ByteArray} are equal and contain the same elements in
+     * the same order; a value less than {@code 0} if the first {@code ByteArray} is lexicographically less than the
+     * second {@code ByteArray}; and a value greater than {@code 0} if the first {@code ByteArray} is lexicographically
+     * greater than the second {@code ByteArray}
+     */
+    public static int compare(@NotNull ByteArray x, @NotNull ByteArray y) {
+        return Arrays.compare(x.arr, y.arr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return new String(arr, StandardCharsets.UTF_8);
+    }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 4c88f2f..2a8ef19 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.storage.LocalConfigurationStorage;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
@@ -83,7 +84,7 @@ public class IgnitionImpl implements Ignition {
         ackBanner();
 
         // Vault Component startup.
-        VaultManager vaultMgr = new VaultManager();
+        VaultManager vaultMgr = new VaultManager(new VaultServiceImpl());
 
         boolean cfgBootstrappedFromPds = vaultMgr.bootstrapped();
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 9f745d4..802d33b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -42,8 +42,9 @@ import org.apache.ignite.internal.table.TableSchemaView;
 import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.metastorage.common.Conditions;
 import org.apache.ignite.metastorage.common.Key;
@@ -130,13 +131,12 @@ public class TableManager implements IgniteTables {
                         UUID tblId = UUID.fromString(placeholderValue);
 
                         try {
-                            String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
-                                .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+                            String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
 
                             int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                                 .tables().get(name).partitions().value();
 
-                            List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(
+                            List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
                                 evt.newEntry().value());
 
                             HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
diff --git a/modules/vault/README.md b/modules/vault/README.md
new file mode 100644
index 0000000..3ee0b01
--- /dev/null
+++ b/modules/vault/README.md
@@ -0,0 +1,4 @@
+# Ignite vault module
+This module provides Vault API implementation.
+
+Note: will be filled later
diff --git a/modules/vault/pom.xml b/modules/vault/pom.xml
index 5f640f1..f290fef 100644
--- a/modules/vault/pom.xml
+++ b/modules/vault/pom.xml
@@ -37,5 +37,17 @@
             <groupId>org.jetbrains</groupId>
             <artifactId>annotations</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index cfc3fd0..92264c1 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -17,28 +17,163 @@
 
 package org.apache.ignite.internal.vault;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
 
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
  */
 public class VaultManager {
+    /** Special key for vault where applied revision for {@code putAll} operation is stored. */
+    private static ByteArray APPLIED_REV = ByteArray.fromString("applied_revision");
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Instance of vault */
+    private VaultService vaultService;
+
+    /** Default constructor.
+     *
+     * @param vaultService Instance of vault.
+     */
+    public VaultManager(VaultService vaultService) {
+        this.vaultService = vaultService;
+    }
 
     /**
      * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
      * either from PDS or from user initial bootstrap configuration.
+     *
+     * TODO: implement when IGNITE-14408 will be ready
      */
     public boolean bootstrapped() {
         return false;
     }
 
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    /**
+     * See {@link VaultService#get(ByteArray)}
+     *
+     * @param key Key. Couldn't be {@code null}.
+     * @return An entry for the given key. Couldn't be {@code null}.
+     */
+    @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
+        return vaultService.get(key);
+    }
+
+    /**
+     * See {@link VaultService#put(ByteArray, byte[])}
+     *
+     * @param key Vault key. Couldn't be {@code null}.
+     * @param val Value. Couldn't be {@code null}.
+     * @return Future representing pending completion of the operation.
+     */
+    @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
+        return vaultService.put(key, val);
+    }
+
+    /**
+     * See {@link VaultService#remove(ByteArray)}
+     *
+     * @param key Vault key. Couldn't be {@code null}.
+     * @return Future representing pending completion of the operation.
+     */
+    @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
+        return vaultService.remove(key);
+    }
+
+    /**
+     * See {@link VaultService#range(ByteArray, ByteArray)}
+     *
+     * @param fromKey Start key of range (inclusive). Couldn't be {@code null}.
+     * @param toKey End key of range (exclusive). Could be {@code null}.
+     * @return Iterator built upon entries corresponding to the given range.
+     */
+    @NotNull public Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey) {
+        return vaultService.range(fromKey, toKey);
+    }
+
+    /**
+     * Inserts or updates entries with given keys and given values and non-negative revision.
+     *
+     * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+     * @param revision Revision for entries. Must be positive.
+     * @return Future representing pending completion of the operation.
+     * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or
+     * if couldn't get applied revision from vault.
+     */
+    public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) throws IgniteInternalCheckedException {
+        synchronized (mux) {
+            byte[] appliedRevBytes;
+
+            try {
+                appliedRevBytes = vaultService.get(APPLIED_REV).get().value();
+            }
+            catch (InterruptedException | ExecutionException e) {
+               throw new IgniteInternalCheckedException("Error occurred when getting applied revision", e);
+            }
+
+            long appliedRevision = appliedRevBytes != null ? ByteUtils.bytesToLong(appliedRevBytes, 0) : 0L;
+
+            if (revision < appliedRevision)
+                throw new IgniteInternalCheckedException("Inconsistency between applied revision from vault and the current revision");
+
+            HashMap<ByteArray, byte[]> mergedMap = new HashMap<>(vals);
+
+            mergedMap.put(APPLIED_REV, ByteUtils.longToBytes(revision));
+
+            return vaultService.putAll(mergedMap);
+        }
+
+    }
+
+    /**
+     * @return Applied revision for {@link VaultManager#putAll} operation.
+     * @throws IgniteInternalCheckedException If couldn't get applied revision from vault.
+     */
+    @NotNull public Long appliedRevision() throws IgniteInternalCheckedException {
+        byte[] appliedRevision;
+
+        synchronized (mux) {
+            try {
+                appliedRevision = vaultService.get(APPLIED_REV).get().value();
+            }
+            catch (InterruptedException | ExecutionException e) {
+                throw new IgniteInternalCheckedException("Error occurred when getting applied revision", e);
+            }
+
+            return appliedRevision == null ? 0L : ByteUtils.bytesToLong(appliedRevision, 0);
+        }
+    }
+
+    /**
+     * See {@link VaultService#watch(VaultWatch)}
+     *
+     * @param vaultWatch Watch which will notify for each update.
+     * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+     */
+    @NotNull public CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch) {
+        return vaultService.watch(vaultWatch);
+    }
 
     /**
-     * This is a proxy to Vault service method.
+     * See {@link VaultService#stopWatch(Long)}
+     *
+     * @param id Subscription identifier.
+     * @return Completed future in case of operation success. Couldn't be {@code null}.
      */
-    public CompletableFuture<VaultEntry> get(byte[] key) {
-        return null;
+    @NotNull public CompletableFuture<Void> stopWatch(@NotNull Long id) {
+        return vaultService.stopWatch(id);
     }
 }
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
similarity index 61%
rename from modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
rename to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
index d35e245..3d4566e 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
@@ -17,45 +17,51 @@
 
 package org.apache.ignite.internal.vault.common;
 
-import java.io.Serializable;
+import org.apache.ignite.lang.ByteArray;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Representation of vault entry.
+ * Represents a vault unit as entry with key and value, where
+ * <ul>
+ *     <li>key - an unique entry's key. Keys are comparable in lexicographic manner and represented as an {@link ByteArray}.</li>
+ *     <li>value - a data which is associated with a key and represented as an array of bytes.</li>
+ * </ul>
  */
-public class VaultEntry implements Serializable {
+// TODO: need to generify with metastorage Entry https://issues.apache.org/jira/browse/IGNITE-14653
+public final class Entry {
     /** Key. */
-    private byte[] key;
+    private final ByteArray key;
 
     /** Value. */
-    private byte[] val;
+    private final byte[] val;
 
     /**
      * Constructs {@code VaultEntry} instance from the given key and value.
      *
-     * @param key Key as a {@code ByteArray}.
+     * @param key Key as a {@code ByteArray}. Couldn't be null.
      * @param val Value as a {@code byte[]}.
      */
-    public VaultEntry(byte[] key, byte[] val) {
+    public Entry(@NotNull ByteArray key, byte[] val) {
         this.key = key;
         this.val = val;
     }
 
     /**
-     * Gets a key bytes.
+     * Returns a {@code ByteArray}.
      *
-     * @return Byte array.
+     * @return The {@code ByteArray}.
      */
-    public @NotNull byte[] key() {
+    @NotNull public ByteArray key() {
         return key;
     }
 
     /**
-     * Gets a value bytes.
+     * Returns a value. Could be {@code null} for empty entry.
      *
-     * @return Byte array.
+     * @return Value.
      */
-    public @NotNull byte[] value() {
+    @Nullable public byte[] value() {
         return val;
     }
 
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
similarity index 51%
copy from modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
copy to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
index cfc3fd0..1938a9e 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
@@ -15,30 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.vault.common;
 
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.jetbrains.annotations.NotNull;
 
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * Vault storage listener for changes.
  */
-public class VaultManager {
-
+//TODO: need to generify with metastorage WatchListener https://issues.apache.org/jira/browse/IGNITE-14653
+public interface VaultListener {
     /**
-     * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
-     * either from PDS or from user initial bootstrap configuration.
+     * The method will be called on each vault update.
+     *
+     * @param entries A single entry or a batch.
+     * @return {@code True} if listener must continue event handling. If returns {@code false} then the listener and
+     * corresponding watch will be unregistered.
      */
-    public boolean bootstrapped() {
-        return false;
-    }
-
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    boolean onUpdate(@NotNull Iterable<Entry> entries);
 
     /**
-     * This is a proxy to Vault service method.
+     * The method will be called in case of an error occurred. The listener and corresponding watch will be
+     * unregistered.
+     *
+     * @param e Exception.
      */
-    public CompletableFuture<VaultEntry> get(byte[] key) {
-        return null;
-    }
+    void onError(@NotNull Throwable e);
 }
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
new file mode 100644
index 0000000..51b2ca8
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.Collections;
+import java.util.Comparator;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Watch for vault entries.
+ * Could be specified by range of keys.
+ * If value of key in range is changed, then corresponding listener will be triggered.
+ */
+public final class VaultWatch {
+    /** Comparator for {@code ByteArray} values. */
+    private static final Comparator<ByteArray> CMP = ByteArray::compare;
+
+    /**
+     * Start key of range (inclusive).
+     * If value of key in range is changed, then corresponding listener will be triggered.
+     */
+    private final ByteArray startKey;
+
+    /**
+     * End key of range (exclusive).
+     * If value of key in range is changed, then corresponding listener will be triggered.
+     */
+    private final ByteArray endKey;
+
+    /** Listener for vault's values updates. */
+    private VaultListener listener;
+
+    /**
+     * @param startKey Start key of range (inclusive).
+     * @param endkey End key of range (exclusive).
+     * @param listener Listener.
+     */
+    public VaultWatch(ByteArray startKey, ByteArray endkey, VaultListener listener) {
+        this.startKey = startKey;
+        this.endKey = endkey;
+        this.listener = listener;
+    }
+
+    /**
+     * Notifies specified listener if {@code val} of key in range was changed.
+     *
+     * @param val Vault entry.
+     * @return {@code True} if watch must continue event handling according to corresponding listener logic. If returns
+     * {@code false} then the listener and corresponding watch will be unregistered.
+     */
+    public boolean notify(Entry val) {
+        if (startKey != null && CMP.compare(val.key(), startKey) < 0)
+            return true;
+
+        if (endKey != null && CMP.compare(val.key(), endKey) >= 0)
+            return true;
+
+        return listener.onUpdate(Collections.singleton(val));
+    }
+
+    /**
+     * The method will be called in case of an error occurred. The watch and corresponding listener will be
+     * unregistered.
+     *
+     * @param e Exception.
+     */
+    public void onError(Throwable e) {
+        listener.onError(e);
+    }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
similarity index 56%
copy from modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
copy to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
index cfc3fd0..a5623c0 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
@@ -15,30 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.vault.common;
 
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.jetbrains.annotations.NotNull;
 
 /**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * Vault watcher.
+ *
+ * Watches for vault entries updates.
  */
-public class VaultManager {
-
+public interface Watcher {
     /**
-     * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
-     * either from PDS or from user initial bootstrap configuration.
+     * Registers watch for vault entries updates.
+     *
+     * @param vaultWatch Vault watch.
+     * @return Id of registered watch.
      */
-    public boolean bootstrapped() {
-        return false;
-    }
+    CompletableFuture<Long> register(@NotNull VaultWatch vaultWatch);
 
-    // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+    /**
+     * Notifies watcher that vault entry {@code val} was changed.
+     *
+     * @param val Vault entry.
+     */
+    void notify(@NotNull Entry val);
 
     /**
-     * This is a proxy to Vault service method.
+     * Cancels watch with specified {@code id}.
+     *
+     * @param id Id of watch.
      */
-    public CompletableFuture<VaultEntry> get(byte[] key) {
-        return null;
-    }
+    void cancel(@NotNull Long id);
 }
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
new file mode 100644
index 0000000..90187b3
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(WatcherImpl.class);
+
+    /** Queue for changed vault entries. */
+    private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+    /** Registered vault watches. */
+    private final Map<Long, VaultWatch> watches = new HashMap<>();
+
+    /** Flag for indicating if watcher is stopped. */
+    private volatile boolean stop;
+
+    /** Counter for generating ids for watches. */
+    private AtomicLong watchIds;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    /** Execution service which runs thread for processing changed vault entries. */
+    private final ExecutorService exec;
+
+    /**
+     * Default constructor.
+     */
+    public WatcherImpl() {
+        watchIds = new AtomicLong(0);
+
+        exec = Executors.newFixedThreadPool(1);
+
+        exec.execute(new WatcherWorker());
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<Long> register(@NotNull VaultWatch vaultWatch) {
+        synchronized (mux) {
+            Long key = watchIds.incrementAndGet();
+
+            watches.put(key, vaultWatch);
+
+            return CompletableFuture.completedFuture(key);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void notify(@NotNull Entry val) {
+        queue.offer(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel(@NotNull Long id) {
+        synchronized (mux) {
+            watches.remove(id);
+        }
+    }
+
+    /**
+     * Shutdowns watcher.
+     */
+    public void shutdown() {
+        stop = true;
+
+        if (exec != null) {
+            List<Runnable> tasks = exec.shutdownNow();
+
+            if (!tasks.isEmpty())
+                LOG.warn("Runnable tasks outlived thread pool executor service");
+
+            try {
+                exec.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignored) {
+                LOG.warn("Got interrupted while waiting for executor service to stop.");
+
+                exec.shutdownNow();
+
+                // Preserve interrupt status.
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Worker that polls changed vault entries from queue and notifies registered watches.
+     */
+    private class WatcherWorker implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (!stop) {
+                try {
+                    Entry val = queue.take();
+
+                    synchronized (mux) {
+                        watches.forEach((k, w) -> {
+                            if (!w.notify(val))
+                                cancel(k);
+                        });
+                    }
+                }
+                catch (InterruptedException interruptedException) {
+                    synchronized (mux) {
+                        watches.forEach((k, w) -> {
+                            w.onError(
+                                new IgniteInternalCheckedException(
+                                    "Error occurred during watches handling ",
+                                    interruptedException.getCause()));
+
+                            cancel(k);
+                        });
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
new file mode 100644
index 0000000..f9b6056
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.internal.vault.common.WatcherImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Simple in-memory representation of vault. Only for test purposes.
+ */
+public class VaultServiceImpl implements VaultService {
+    /** Map to store values. */
+    private final NavigableMap<ByteArray, byte[]> storage;
+
+    /** Mutex. */
+    private final Object mux = new Object();
+
+    private final WatcherImpl watcher;
+
+    public VaultServiceImpl() {
+        this.watcher = new WatcherImpl();
+        this.storage = new TreeMap<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
+        synchronized (mux) {
+            return CompletableFuture.completedFuture(new Entry(key, storage.get(key)));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
+        synchronized (mux) {
+            storage.put(key, val);
+
+            watcher.notify(new Entry(key, val));
+
+            return CompletableFuture.allOf();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
+        synchronized (mux) {
+            storage.remove(key);
+
+            return CompletableFuture.allOf();
+        }
+    }
+
+    /** {@inheritDoc} */
+    //TODO: use Cursor instead of Iterator https://issues.apache.org/jira/browse/IGNITE-14654
+    @Override @NotNull public Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey) {
+        synchronized (mux) {
+            ArrayList<Entry> res = new ArrayList<>();
+
+            for (Map.Entry<ByteArray, byte[]> e : storage.subMap(fromKey, toKey).entrySet())
+                res.add(new Entry(new ByteArray(e.getKey().bytes()), e.getValue().clone()));
+
+            return res.iterator();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch) {
+        synchronized (mux) {
+            return watcher.register(vaultWatch);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull Long id) {
+        synchronized (mux) {
+            watcher.cancel(id);
+
+            return CompletableFuture.allOf();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+        synchronized (mux) {
+            storage.putAll(vals);
+
+            return CompletableFuture.allOf();
+        }
+    }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
new file mode 100644
index 0000000..247cf4f
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.service;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Defines interface for accessing to a vault service.
+ */
+// TODO: need to generify with MetastorageService https://issues.apache.org/jira/browse/IGNITE-14653
+public interface VaultService {
+    /**
+     * Retrieves an entry for the given key.
+     *
+     * @param key Key. Couldn't be {@code null}.
+     * @return An entry for the given key. Couldn't be {@code null}.
+     */
+    @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key);
+
+    /**
+     * Write value with key to vault.
+     *
+     * @param key Vault key. Couldn't be {@code null}.
+     * @param val Value. Couldn't be {@code null}.
+     * @return Future representing pending completion of the operation.
+     */
+    @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val);
+
+    /**
+     * Remove value with key from vault.
+     *
+     * @param key Vault key. Couldn't be {@code null}.
+     * @return Future representing pending completion of the operation.
+     */
+    @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key);
+
+    /**
+     * Returns a view of the portion of vault whose keys range from fromKey, inclusive, to toKey, exclusive.
+     *
+     * @param fromKey Start key of range (inclusive). Couldn't be {@code null}.
+     * @param toKey End key of range (exclusive). Could be {@code null}.
+     * @return Iterator built upon entries corresponding to the given range.
+     */
+    @NotNull Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey);
+
+    /**
+     * Subscribes on vault storage updates for the given key.
+     *
+     * @param vaultWatch Watch which will notify for each update.
+     * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+     */
+    @NotNull CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch);
+
+    /**
+     * Cancels subscription for the given identifier.
+     *
+     * @param id Subscription identifier.
+     * @return Completed future in case of operation success. Couldn't be {@code null}.
+     */
+    @NotNull CompletableFuture<Void> stopWatch(@NotNull Long id);
+
+    /**
+     * Inserts or updates entries with given keys and given values.
+     *
+     * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+     * @return Completed future.
+     */
+    @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals);
+}
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
new file mode 100644
index 0000000..3a11a4d
--- /dev/null
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultListener;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test for base vault manager contracts.
+ */
+public class VaultBaseContractsTest {
+    /** Vault. */
+    private VaultManager vaultManager;
+
+    /**
+     * Instantiate vault.
+     */
+    @BeforeEach
+    public void setUp() {
+        vaultManager = new VaultManager(new VaultServiceImpl());
+    }
+
+    /**
+     * put contract
+     */
+    @Test
+    public void put() throws ExecutionException, InterruptedException {
+        ByteArray key = getKey(1);
+        byte[] val = getValue(key, 1);
+
+        assertNull(vaultManager.get(key).get().value());
+
+        vaultManager.put(key, val);
+
+        Entry v = vaultManager.get(key).get();
+
+        assertFalse(v.empty());
+        assertEquals(val, v.value());
+
+        vaultManager.put(key, val);
+
+        v = vaultManager.get(key).get();
+
+        assertFalse(v.empty());
+        assertEquals(val, v.value());
+    }
+
+    /**
+     * remove contract.
+     */
+    @Test
+    public void remove() throws ExecutionException, InterruptedException {
+        ByteArray key = getKey(1);
+        byte[] val = getValue(key, 1);
+
+        assertNull(vaultManager.get(key).get().value());
+
+        // Remove non-existent value.
+        vaultManager.remove(key);
+
+        assertNull(vaultManager.get(key).get().value());
+
+        vaultManager.put(key, val);
+
+        Entry v = vaultManager.get(key).get();
+
+        assertFalse(v.empty());
+        assertEquals(val, v.value());
+
+        // Remove existent value.
+        vaultManager.remove(key);
+
+        v = vaultManager.get(key).get();
+
+        assertNull(v.value());
+    }
+
+    /**
+     * range contract.
+     */
+    @Test
+    public void range() throws ExecutionException, InterruptedException {
+        ByteArray key;
+
+        Map<ByteArray, byte[]> values = new HashMap<>();
+
+        for (int i = 0; i < 10; i++) {
+            key = getKey(i);
+
+            values.put(key, getValue(key, i));
+
+            assertNull(vaultManager.get(key).get().value());
+        }
+
+        values.forEach((k, v) -> vaultManager.put(k, v));
+
+        for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+            assertEquals(entry.getValue(), vaultManager.get(entry.getKey()).get().value());
+
+        Iterator<Entry> it = vaultManager.range(getKey(3), getKey(7));
+
+        List<Entry> rangeRes = new ArrayList<>();
+
+        it.forEachRemaining(rangeRes::add);
+
+        assertEquals(4, rangeRes.size());
+
+        //Check that we have exact range from "key3" to "key6"
+        for (int i = 3; i < 7; i++)
+            assertArrayEquals(values.get(getKey(i)), rangeRes.get(i - 3).value());
+    }
+
+    /**
+     * watch contract.
+     */
+    @Test
+    public void watch() throws ExecutionException, InterruptedException {
+        ByteArray key;
+
+        Map<ByteArray, byte[]> values = new HashMap<>();
+
+        for (int i = 0; i < 10; i++) {
+            key = getKey(i);
+
+            values.put(key, getValue(key, i));
+        }
+
+        values.forEach((k, v) -> vaultManager.put(k, v));
+
+        for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+            assertEquals(entry.getValue(), vaultManager.get(entry.getKey()).get().value());
+
+        CountDownLatch counter = new CountDownLatch(4);
+
+        VaultWatch vaultWatch = new VaultWatch(getKey(3), getKey(7), new VaultListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<Entry> entries) {
+                counter.countDown();
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                // no-op
+            }
+        });
+
+        vaultManager.watch(vaultWatch);
+
+        for (int i = 3; i < 7; i++)
+            vaultManager.put(getKey(i), ("new" + i).getBytes());
+
+        assertTrue(counter.await(10, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * putAll contract.
+     */
+    @Test
+    public void putAllAndRevision() throws ExecutionException, InterruptedException, IgniteInternalCheckedException {
+        Map<ByteArray, byte[]> entries = new HashMap<>();
+
+        int entriesNum = 100;
+
+        for (int i = 0; i < entriesNum; i++) {
+            ByteArray key = getKey(i);
+
+            entries.put(key, getValue(key, i));
+        }
+
+        for (int i = 0; i < entriesNum; i++) {
+            ByteArray key = getKey(i);
+
+            assertNull(vaultManager.get(key).get().value());
+        }
+
+        vaultManager.putAll(entries, 1L);
+
+        for (int i = 0; i < entriesNum; i++) {
+            ByteArray key = getKey(i);
+
+            assertEquals(entries.get(key), vaultManager.get(key).get().value());
+        }
+
+        assertEquals(1L, vaultManager.appliedRevision());
+    }
+
+    /**
+     * Creates key for vault entry.
+     */
+    private static ByteArray getKey(int k) {
+        return ByteArray.fromString("key" + k);
+    }
+
+    /**
+     * Creates value represented by byte array.
+     */
+    private static byte[] getValue(ByteArray k, int v) {
+        return ("key" + k + '_' + "val" + v).getBytes();
+    }
+}