You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/27 11:30:40 UTC
[ignite-3] branch main updated: IGNITE-14407 introduced in-memory
vault implementation. Fixes #105
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9af1037 IGNITE-14407 introduced in-memory vault implementation. Fixes #105
9af1037 is described below
commit 9af10372881d2979a06ea616d9c5dd3ffdfa5042
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Tue Apr 27 14:30:10 2021 +0300
IGNITE-14407 introduced in-memory vault implementation. Fixes #105
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../ignite/internal/affinity/AffinityManager.java | 8 +-
.../affinity/RendezvousAffinityFunctionTest.java | 6 +-
.../org/apache/ignite/internal/util/ByteUtils.java | 134 ++++++++++++
.../apache/ignite/internal/util/IgniteUtils.java | 51 -----
.../java/org/apache/ignite/lang/ByteArray.java | 106 +++++++++
.../apache/ignite/internal/app/IgnitionImpl.java | 3 +-
.../internal/table/distributed/TableManager.java | 8 +-
modules/vault/README.md | 4 +
modules/vault/pom.xml | 12 ++
.../apache/ignite/internal/vault/VaultManager.java | 147 ++++++++++++-
.../vault/common/{VaultEntry.java => Entry.java} | 32 +--
.../VaultListener.java} | 33 ++-
.../ignite/internal/vault/common/VaultWatch.java | 85 ++++++++
.../{VaultManager.java => common/Watcher.java} | 36 ++--
.../ignite/internal/vault/common/WatcherImpl.java | 151 +++++++++++++
.../internal/vault/impl/VaultServiceImpl.java | 114 ++++++++++
.../internal/vault/service/VaultService.java | 90 ++++++++
.../vault/impl/VaultBaseContractsTest.java | 236 +++++++++++++++++++++
18 files changed, 1142 insertions(+), 114 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 16bca40..d09094e 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -28,8 +28,9 @@ import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.metastorage.common.Conditions;
import org.apache.ignite.metastorage.common.Key;
@@ -141,8 +142,7 @@ public class AffinityManager {
UUID tblId = UUID.fromString(placeholderValue);
try {
- String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
- .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+ String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().get(name).partitions().value();
@@ -151,7 +151,7 @@ public class AffinityManager {
metaStorageMgr.invoke(evt.newEntry().key(),
Conditions.value().eq(evt.newEntry().value()),
- Operations.put(IgniteUtils.toBytes(
+ Operations.put(ByteUtils.toBytes(
RendezvousAffinityFunction.assignPartitions(
baselineMgr.nodes(),
partitions,
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 2dd46df..529c592 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
@@ -125,13 +125,13 @@ public class RendezvousAffinityFunctionTest {
null
);
- byte[] assignmentBytes = IgniteUtils.toBytes(assignment);
+ byte[] assignmentBytes = ByteUtils.toBytes(assignment);
assertNotNull(assignment);
LOG.info("Assignment is serialized successfully [bytes=" + assignmentBytes.length + ']');
- List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(assignmentBytes);
+ List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentBytes);
assertNotNull(deserializedAssignment);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
new file mode 100644
index 0000000..aefd419
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Utility class provides various method for manipulating with bytes.
+ */
+public class ByteUtils {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ByteUtils.class);
+
+ /**
+ * Constructs {@code long} from byte array.
+ *
+ * @param bytes Array of bytes.
+ * @param off Offset in {@code bytes} array.
+ * @return Long value.
+ */
+ public static long bytesToLong(byte[] bytes, int off) {
+ assert bytes != null;
+
+ int bytesCnt = Long.SIZE >> 3;
+
+ if (off + bytesCnt > bytes.length)
+ bytesCnt = bytes.length - off;
+
+ long res = 0;
+
+ for (int i = 0; i < bytesCnt; i++) {
+ int shift = bytesCnt - i - 1 << 3;
+
+ res |= (0xffL & bytes[off++]) << shift;
+ }
+
+ return res;
+ }
+
+ /**
+ * Converts primitive {@code long} type to byte array.
+ *
+ * @param l Long value.
+ * @return Array of bytes.
+ */
+ public static byte[] longToBytes(long l) {
+ return toBytes(l, new byte[8], 0, 8);
+ }
+
+ /**
+ * Converts primitive {@code long} type to byte array and stores it in specified
+ * byte array. The highest byte in the value is the first byte in result array.
+ *
+ * @param l Unsigned long value.
+ * @param bytes Bytes array to write result to.
+ * @param off Offset in the target array to write result to.
+ * @param limit Limit of bytes to write into output.
+ * @return Number of bytes overwritten in {@code bytes} array.
+ */
+ private static byte[] toBytes(long l, byte[] bytes, int off, int limit) {
+ assert bytes != null;
+ assert limit <= 8;
+ assert bytes.length >= off + limit;
+
+ for (int i = limit - 1; i >= 0; i--) {
+ bytes[off + i] = (byte)(l & 0xFF);
+ l >>>= 8;
+ }
+
+ return bytes;
+ }
+
+ /**
+ * Serializes an object to byte array using native java serialization mechanism.
+ *
+ * @param obj Object to serialize.
+ * @return Byte array.
+ */
+ public static byte[] toBytes(Object obj) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+ out.writeObject(obj);
+
+ out.flush();
+
+ return bos.toByteArray();
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Could not serialize a class [cls=" + obj.getClass().getName() + "]", e);
+
+ return null;
+ }
+ }
+
+ /**
+ * Deserializes an object from byte array using native java serialization mechanism.
+ *
+ * @param bytes Byte array.
+ * @return Object.
+ */
+ public static Object fromBytes(byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
+ try (ObjectInputStream in = new ObjectInputStream(bis)) {
+ return in.readObject();
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Could not deserialize an object", e);
+
+ return null;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index a490af3..60d637b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,21 +17,13 @@
package org.apache.ignite.internal.util;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import org.apache.ignite.lang.IgniteLogger;
/**
* Collection of utility methods used throughout the system.
*/
public class IgniteUtils {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteUtils.class);
-
/** Version of the JDK. */
private static String jdkVer;
@@ -175,47 +167,4 @@ public class IgniteUtils {
return hash(val);
}
-
- /**
- * Serializes an object to byte array using native java serialization mechanism.
- *
- * @param obj Object to serialize.
- * @return Byte array.
- */
- public static byte[] toBytes(Object obj) {
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
- try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
- out.writeObject(obj);
-
- out.flush();
-
- return bos.toByteArray();
- }
- }
- catch (Exception e) {
- LOG.warn("Could not serialize a class [cls=" + obj.getClass().getName() + "]", e);
-
- return null;
- }
- }
-
- /**
- * Deserializes an object from byte array using native java serialization mechanism.
- *
- * @param bytes Byte array.
- * @return Object.
- */
- public static Object fromBytes(byte[] bytes) {
- try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
- try (ObjectInputStream in = new ObjectInputStream(bis)) {
- return in.readObject();
- }
- }
- catch (Exception e) {
- LOG.warn("Could not deserialize an object", e);
-
- return null;
- }
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
new file mode 100644
index 0000000..c38be87
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ByteArray.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A class for handling byte array.
+ */
+public final class ByteArray implements Comparable<ByteArray> {
+ /** Byte-wise representation of the {@code ByteArray}. */
+ @NotNull
+ private final byte[] arr;
+
+ /**
+ * Constructs {@code ByteArray} instance from the given byte array. <em>Note:</em> copy of the given byte array will not be
+ * created in order to avoid redundant memory consumption.
+ *
+ * @param arr Byte array. Can't be {@code null}.
+ */
+ public ByteArray(@NotNull byte[] arr) {
+ this.arr = arr;
+ }
+
+ /**
+ * Constructs {@code ByteArray} instance from the given string. {@link StandardCharsets#UTF_8} charset is used for
+ * encoding the input string.
+ *
+ * @param s The string {@code ByteArray} representation. Can't be {@code null}.
+ * @return {@code ByteArray} instance from the given string.
+ */
+ public static ByteArray fromString(@NotNull String s) {
+ return new ByteArray(s.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Returns the {@code ByteArray} as byte array.
+ *
+ * @return Bytes of the {@code ByteArray}.
+ */
+ public byte[] bytes() {
+ return arr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ByteArray byteArray = (ByteArray)o;
+
+ return Arrays.equals(arr, byteArray.arr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Arrays.hashCode(arr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull ByteArray other) {
+ return Arrays.compare(this.arr, other.arr);
+ }
+
+ /**
+ * Compares two {@code ByteArray} values. The value returned is identical to what would be returned by:
+ * <pre>
+ * x.compareTo(y)
+ * </pre>
+ * <p>
+ * where x and y are {@code ByteArray}'s
+ *
+ * @param x The first {@code ByteArray} to compare.
+ * @param y The second {@code ByteArray} to compare.
+ * @return the value {@code 0} if the first and second {@code ByteArray} are equal and contain the same elements in
+ * the same order; a value less than {@code 0} if the first {@code ByteArray} is lexicographically less than the
+ * second {@code ByteArray}; and a value greater than {@code 0} if the first {@code ByteArray} is lexicographically
+ * greater than the second {@code ByteArray}
+ */
+ public static int compare(@NotNull ByteArray x, @NotNull ByteArray y) {
+ return Arrays.compare(x.arr, y.arr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return new String(arr, StandardCharsets.UTF_8);
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 4c88f2f..2a8ef19 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.storage.LocalConfigurationStorage;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
@@ -83,7 +84,7 @@ public class IgnitionImpl implements Ignition {
ackBanner();
// Vault Component startup.
- VaultManager vaultMgr = new VaultManager();
+ VaultManager vaultMgr = new VaultManager(new VaultServiceImpl());
boolean cfgBootstrappedFromPds = vaultMgr.bootstrapped();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 9f745d4..802d33b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -42,8 +42,9 @@ import org.apache.ignite.internal.table.TableSchemaView;
import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.metastorage.common.Conditions;
import org.apache.ignite.metastorage.common.Key;
@@ -130,13 +131,12 @@ public class TableManager implements IgniteTables {
UUID tblId = UUID.fromString(placeholderValue);
try {
- String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
- .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+ String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().get(name).partitions().value();
- List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
evt.newEntry().value());
HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
diff --git a/modules/vault/README.md b/modules/vault/README.md
new file mode 100644
index 0000000..3ee0b01
--- /dev/null
+++ b/modules/vault/README.md
@@ -0,0 +1,4 @@
+# Ignite vault module
+This module provides Vault API implementation.
+
+Note: will be filled later
diff --git a/modules/vault/pom.xml b/modules/vault/pom.xml
index 5f640f1..f290fef 100644
--- a/modules/vault/pom.xml
+++ b/modules/vault/pom.xml
@@ -37,5 +37,17 @@
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index cfc3fd0..92264c1 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -17,28 +17,163 @@
package org.apache.ignite.internal.vault;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
/**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * VaultManager is responsible for handling {@link VaultService} lifecycle
+ * and providing interface for managing local keys.
*/
public class VaultManager {
+ /** Special key for vault where applied revision for {@code putAll} operation is stored. */
+ private static ByteArray APPLIED_REV = ByteArray.fromString("applied_revision");
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Instance of vault */
+ private VaultService vaultService;
+
+ /** Default constructor.
+ *
+ * @param vaultService Instance of vault.
+ */
+ public VaultManager(VaultService vaultService) {
+ this.vaultService = vaultService;
+ }
/**
* @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
* either from PDS or from user initial bootstrap configuration.
+ *
+ * TODO: implement when IGNITE-14408 will be ready
*/
public boolean bootstrapped() {
return false;
}
- // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+ /**
+ * See {@link VaultService#get(ByteArray)}
+ *
+ * @param key Key. Couldn't be {@code null}.
+ * @return An entry for the given key. Couldn't be {@code null}.
+ */
+ @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
+ return vaultService.get(key);
+ }
+
+ /**
+ * See {@link VaultService#put(ByteArray, byte[])}
+ *
+ * @param key Vault key. Couldn't be {@code null}.
+ * @param val Value. Couldn't be {@code null}.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
+ return vaultService.put(key, val);
+ }
+
+ /**
+ * See {@link VaultService#remove(ByteArray)}
+ *
+ * @param key Vault key. Couldn't be {@code null}.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
+ return vaultService.remove(key);
+ }
+
+ /**
+ * See {@link VaultService#range(ByteArray, ByteArray)}
+ *
+ * @param fromKey Start key of range (inclusive). Couldn't be {@code null}.
+ * @param toKey End key of range (exclusive). Could be {@code null}.
+ * @return Iterator built upon entries corresponding to the given range.
+ */
+ @NotNull public Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey) {
+ return vaultService.range(fromKey, toKey);
+ }
+
+ /**
+ * Inserts or updates entries with given keys and given values and non-negative revision.
+ *
+ * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+ * @param revision Revision for entries. Must be positive.
+ * @return Future representing pending completion of the operation.
+ * @throws IgniteInternalCheckedException If revision is inconsistent with applied revision from vault or
+ * if couldn't get applied revision from vault.
+ */
+ public CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals, long revision) throws IgniteInternalCheckedException {
+ synchronized (mux) {
+ byte[] appliedRevBytes;
+
+ try {
+ appliedRevBytes = vaultService.get(APPLIED_REV).get().value();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalCheckedException("Error occurred when getting applied revision", e);
+ }
+
+ long appliedRevision = appliedRevBytes != null ? ByteUtils.bytesToLong(appliedRevBytes, 0) : 0L;
+
+ if (revision < appliedRevision)
+ throw new IgniteInternalCheckedException("Inconsistency between applied revision from vault and the current revision");
+
+ HashMap<ByteArray, byte[]> mergedMap = new HashMap<>(vals);
+
+ mergedMap.put(APPLIED_REV, ByteUtils.longToBytes(revision));
+
+ return vaultService.putAll(mergedMap);
+ }
+
+ }
+
+ /**
+ * @return Applied revision for {@link VaultManager#putAll} operation.
+ * @throws IgniteInternalCheckedException If couldn't get applied revision from vault.
+ */
+ @NotNull public Long appliedRevision() throws IgniteInternalCheckedException {
+ byte[] appliedRevision;
+
+ synchronized (mux) {
+ try {
+ appliedRevision = vaultService.get(APPLIED_REV).get().value();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalCheckedException("Error occurred when getting applied revision", e);
+ }
+
+ return appliedRevision == null ? 0L : ByteUtils.bytesToLong(appliedRevision, 0);
+ }
+ }
+
+ /**
+ * See {@link VaultService#watch(VaultWatch)}
+ *
+ * @param vaultWatch Watch which will notify for each update.
+ * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+ */
+ @NotNull public CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch) {
+ return vaultService.watch(vaultWatch);
+ }
/**
- * This is a proxy to Vault service method.
+ * See {@link VaultService#stopWatch(Long)}
+ *
+ * @param id Subscription identifier.
+ * @return Completed future in case of operation success. Couldn't be {@code null}.
*/
- public CompletableFuture<VaultEntry> get(byte[] key) {
- return null;
+ @NotNull public CompletableFuture<Void> stopWatch(@NotNull Long id) {
+ return vaultService.stopWatch(id);
}
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
similarity index 61%
rename from modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
rename to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
index d35e245..3d4566e 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Entry.java
@@ -17,45 +17,51 @@
package org.apache.ignite.internal.vault.common;
-import java.io.Serializable;
+import org.apache.ignite.lang.ByteArray;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
- * Representation of vault entry.
+ * Represents a vault unit as entry with key and value, where
+ * <ul>
+ * <li>key - an unique entry's key. Keys are comparable in lexicographic manner and represented as an {@link ByteArray}.</li>
+ * <li>value - a data which is associated with a key and represented as an array of bytes.</li>
+ * </ul>
*/
-public class VaultEntry implements Serializable {
+// TODO: need to generify with metastorage Entry https://issues.apache.org/jira/browse/IGNITE-14653
+public final class Entry {
/** Key. */
- private byte[] key;
+ private final ByteArray key;
/** Value. */
- private byte[] val;
+ private final byte[] val;
/**
* Constructs {@code VaultEntry} instance from the given key and value.
*
- * @param key Key as a {@code ByteArray}.
+ * @param key Key as a {@code ByteArray}. Couldn't be null.
* @param val Value as a {@code byte[]}.
*/
- public VaultEntry(byte[] key, byte[] val) {
+ public Entry(@NotNull ByteArray key, byte[] val) {
this.key = key;
this.val = val;
}
/**
- * Gets a key bytes.
+ * Returns a {@code ByteArray}.
*
- * @return Byte array.
+ * @return The {@code ByteArray}.
*/
- public @NotNull byte[] key() {
+ @NotNull public ByteArray key() {
return key;
}
/**
- * Gets a value bytes.
+ * Returns a value. Could be {@code null} for empty entry.
*
- * @return Byte array.
+ * @return Value.
*/
- public @NotNull byte[] value() {
+ @Nullable public byte[] value() {
return val;
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
similarity index 51%
copy from modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
copy to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
index cfc3fd0..1938a9e 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultListener.java
@@ -15,30 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.vault.common;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.jetbrains.annotations.NotNull;
/**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * Vault storage listener for changes.
*/
-public class VaultManager {
-
+//TODO: need to generify with metastorage WatchListener https://issues.apache.org/jira/browse/IGNITE-14653
+public interface VaultListener {
/**
- * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
- * either from PDS or from user initial bootstrap configuration.
+ * The method will be called on each vault update.
+ *
+ * @param entries A single entry or a batch.
+ * @return {@code True} if listener must continue event handling. If returns {@code false} then the listener and
+ * corresponding watch will be unregistered.
*/
- public boolean bootstrapped() {
- return false;
- }
-
- // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+ boolean onUpdate(@NotNull Iterable<Entry> entries);
/**
- * This is a proxy to Vault service method.
+ * The method will be called in case of an error occurred. The listener and corresponding watch will be
+ * unregistered.
+ *
+ * @param e Exception.
*/
- public CompletableFuture<VaultEntry> get(byte[] key) {
- return null;
- }
+ void onError(@NotNull Throwable e);
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
new file mode 100644
index 0000000..51b2ca8
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultWatch.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.Collections;
+import java.util.Comparator;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Watch for vault entries.
+ * Could be specified by range of keys.
+ * If value of key in range is changed, then corresponding listener will be triggered.
+ */
+public final class VaultWatch {
+ /** Comparator for {@code ByteArray} values. */
+ private static final Comparator<ByteArray> CMP = ByteArray::compare;
+
+ /**
+ * Start key of range (inclusive).
+ * If value of key in range is changed, then corresponding listener will be triggered.
+ */
+ private final ByteArray startKey;
+
+ /**
+ * End key of range (exclusive).
+ * If value of key in range is changed, then corresponding listener will be triggered.
+ */
+ private final ByteArray endKey;
+
+ /** Listener for vault's values updates. */
+ private VaultListener listener;
+
+ /**
+ * @param startKey Start key of range (inclusive).
+ * @param endkey End key of range (exclusive).
+ * @param listener Listener.
+ */
+ public VaultWatch(ByteArray startKey, ByteArray endkey, VaultListener listener) {
+ this.startKey = startKey;
+ this.endKey = endkey;
+ this.listener = listener;
+ }
+
+ /**
+ * Notifies specified listener if {@code val} of key in range was changed.
+ *
+ * @param val Vault entry.
+ * @return {@code True} if watch must continue event handling according to corresponding listener logic. If returns
+ * {@code false} then the listener and corresponding watch will be unregistered.
+ */
+ public boolean notify(Entry val) {
+ if (startKey != null && CMP.compare(val.key(), startKey) < 0)
+ return true;
+
+ if (endKey != null && CMP.compare(val.key(), endKey) >= 0)
+ return true;
+
+ return listener.onUpdate(Collections.singleton(val));
+ }
+
+ /**
+ * The method will be called in case of an error occurred. The watch and corresponding listener will be
+ * unregistered.
+ *
+ * @param e Exception.
+ */
+ public void onError(Throwable e) {
+ listener.onError(e);
+ }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
similarity index 56%
copy from modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
copy to modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
index cfc3fd0..a5623c0 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/Watcher.java
@@ -15,30 +15,36 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.vault;
+package org.apache.ignite.internal.vault.common;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.vault.common.VaultEntry;
+import org.jetbrains.annotations.NotNull;
/**
- * VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
+ * Vault watcher.
+ *
+ * Watches for vault entries updates.
*/
-public class VaultManager {
-
+public interface Watcher {
/**
- * @return {@code true} if VaultService beneath given VaultManager was bootstrapped with data
- * either from PDS or from user initial bootstrap configuration.
+ * Registers watch for vault entries updates.
+ *
+ * @param vaultWatch Vault watch.
+ * @return Id of registered watch.
*/
- public boolean bootstrapped() {
- return false;
- }
+ CompletableFuture<Long> register(@NotNull VaultWatch vaultWatch);
- // TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+ /**
+ * Notifies watcher that vault entry {@code val} was changed.
+ *
+ * @param val Vault entry.
+ */
+ void notify(@NotNull Entry val);
/**
- * This is a proxy to Vault service method.
+ * Cancels watch with specified {@code id}.
+ *
+ * @param id Id of watch.
*/
- public CompletableFuture<VaultEntry> get(byte[] key) {
- return null;
- }
+ void cancel(@NotNull Long id);
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
new file mode 100644
index 0000000..90187b3
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/WatcherImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of vault {@link Watcher}.
+ */
+public class WatcherImpl implements Watcher {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(WatcherImpl.class);
+
+ /** Queue for changed vault entries. */
+ private final BlockingQueue<Entry> queue = new LinkedBlockingQueue<>();
+
+ /** Registered vault watches. */
+ private final Map<Long, VaultWatch> watches = new HashMap<>();
+
+ /** Flag for indicating if watcher is stopped. */
+ private volatile boolean stop;
+
+ /** Counter for generating ids for watches. */
+ private AtomicLong watchIds;
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ /** Execution service which runs thread for processing changed vault entries. */
+ private final ExecutorService exec;
+
+ /**
+ * Default constructor.
+ */
+ public WatcherImpl() {
+ watchIds = new AtomicLong(0);
+
+ exec = Executors.newFixedThreadPool(1);
+
+ exec.execute(new WatcherWorker());
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Long> register(@NotNull VaultWatch vaultWatch) {
+ synchronized (mux) {
+ Long key = watchIds.incrementAndGet();
+
+ watches.put(key, vaultWatch);
+
+ return CompletableFuture.completedFuture(key);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void notify(@NotNull Entry val) {
+ queue.offer(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(@NotNull Long id) {
+ synchronized (mux) {
+ watches.remove(id);
+ }
+ }
+
+ /**
+ * Shutdowns watcher.
+ */
+ public void shutdown() {
+ stop = true;
+
+ if (exec != null) {
+ List<Runnable> tasks = exec.shutdownNow();
+
+ if (!tasks.isEmpty())
+ LOG.warn("Runnable tasks outlived thread pool executor service");
+
+ try {
+ exec.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ LOG.warn("Got interrupted while waiting for executor service to stop.");
+
+ exec.shutdownNow();
+
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Worker that polls changed vault entries from queue and notifies registered watches.
+ */
+ private class WatcherWorker implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ while (!stop) {
+ try {
+ Entry val = queue.take();
+
+ synchronized (mux) {
+ watches.forEach((k, w) -> {
+ if (!w.notify(val))
+ cancel(k);
+ });
+ }
+ }
+ catch (InterruptedException interruptedException) {
+ synchronized (mux) {
+ watches.forEach((k, w) -> {
+ w.onError(
+ new IgniteInternalCheckedException(
+ "Error occurred during watches handling ",
+ interruptedException.getCause()));
+
+ cancel(k);
+ });
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
new file mode 100644
index 0000000..f9b6056
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/impl/VaultServiceImpl.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.internal.vault.common.WatcherImpl;
+import org.apache.ignite.internal.vault.service.VaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Simple in-memory representation of vault. Only for test purposes.
+ */
+public class VaultServiceImpl implements VaultService {
+ /** Map to store values. */
+ private final NavigableMap<ByteArray, byte[]> storage;
+
+ /** Mutex. */
+ private final Object mux = new Object();
+
+ private final WatcherImpl watcher;
+
+ public VaultServiceImpl() {
+ this.watcher = new WatcherImpl();
+ this.storage = new TreeMap<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<Entry> get(@NotNull ByteArray key) {
+ synchronized (mux) {
+ return CompletableFuture.completedFuture(new Entry(key, storage.get(key)));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val) {
+ synchronized (mux) {
+ storage.put(key, val);
+
+ watcher.notify(new Entry(key, val));
+
+ return CompletableFuture.allOf();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public CompletableFuture<Void> remove(@NotNull ByteArray key) {
+ synchronized (mux) {
+ storage.remove(key);
+
+ return CompletableFuture.allOf();
+ }
+ }
+
+ /** {@inheritDoc} */
+ //TODO: use Cursor instead of Iterator https://issues.apache.org/jira/browse/IGNITE-14654
+ @Override @NotNull public Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey) {
+ synchronized (mux) {
+ ArrayList<Entry> res = new ArrayList<>();
+
+ for (Map.Entry<ByteArray, byte[]> e : storage.subMap(fromKey, toKey).entrySet())
+ res.add(new Entry(new ByteArray(e.getKey().bytes()), e.getValue().clone()));
+
+ return res.iterator();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch) {
+ synchronized (mux) {
+ return watcher.register(vaultWatch);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull Long id) {
+ synchronized (mux) {
+ watcher.cancel(id);
+
+ return CompletableFuture.allOf();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+ synchronized (mux) {
+ storage.putAll(vals);
+
+ return CompletableFuture.allOf();
+ }
+ }
+}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
new file mode 100644
index 0000000..247cf4f
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/service/VaultService.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.service;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.lang.ByteArray;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Defines interface for accessing to a vault service.
+ */
+// TODO: need to generify with MetastorageService https://issues.apache.org/jira/browse/IGNITE-14653
+public interface VaultService {
+ /**
+ * Retrieves an entry for the given key.
+ *
+ * @param key Key. Couldn't be {@code null}.
+ * @return An entry for the given key. Couldn't be {@code null}.
+ */
+ @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key);
+
+ /**
+ * Write value with key to vault.
+ *
+ * @param key Vault key. Couldn't be {@code null}.
+ * @param val Value. Couldn't be {@code null}.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, @NotNull byte[] val);
+
+ /**
+ * Remove value with key from vault.
+ *
+ * @param key Vault key. Couldn't be {@code null}.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key);
+
+ /**
+ * Returns a view of the portion of vault whose keys range from fromKey, inclusive, to toKey, exclusive.
+ *
+ * @param fromKey Start key of range (inclusive). Couldn't be {@code null}.
+ * @param toKey End key of range (exclusive). Could be {@code null}.
+ * @return Iterator built upon entries corresponding to the given range.
+ */
+ @NotNull Iterator<Entry> range(@NotNull ByteArray fromKey, @NotNull ByteArray toKey);
+
+ /**
+ * Subscribes on vault storage updates for the given key.
+ *
+ * @param vaultWatch Watch which will notify for each update.
+ * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+ */
+ @NotNull CompletableFuture<Long> watch(@NotNull VaultWatch vaultWatch);
+
+ /**
+ * Cancels subscription for the given identifier.
+ *
+ * @param id Subscription identifier.
+ * @return Completed future in case of operation success. Couldn't be {@code null}.
+ */
+ @NotNull CompletableFuture<Void> stopWatch(@NotNull Long id);
+
+ /**
+ * Inserts or updates entries with given keys and given values.
+ *
+ * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty.
+ * @return Completed future.
+ */
+ @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals);
+}
diff --git a/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java b/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
new file mode 100644
index 0000000..3a11a4d
--- /dev/null
+++ b/modules/vault/src/test/java/org/apache/ignite/internal/vault/impl/VaultBaseContractsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.common.Entry;
+import org.apache.ignite.internal.vault.common.VaultListener;
+import org.apache.ignite.internal.vault.common.VaultWatch;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test for base vault manager contracts.
+ */
+public class VaultBaseContractsTest {
+ /** Vault. */
+ private VaultManager vaultManager;
+
+ /**
+ * Instantiate vault.
+ */
+ @BeforeEach
+ public void setUp() {
+ vaultManager = new VaultManager(new VaultServiceImpl());
+ }
+
+ /**
+ * put contract
+ */
+ @Test
+ public void put() throws ExecutionException, InterruptedException {
+ ByteArray key = getKey(1);
+ byte[] val = getValue(key, 1);
+
+ assertNull(vaultManager.get(key).get().value());
+
+ vaultManager.put(key, val);
+
+ Entry v = vaultManager.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+
+ vaultManager.put(key, val);
+
+ v = vaultManager.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+ }
+
+ /**
+ * remove contract.
+ */
+ @Test
+ public void remove() throws ExecutionException, InterruptedException {
+ ByteArray key = getKey(1);
+ byte[] val = getValue(key, 1);
+
+ assertNull(vaultManager.get(key).get().value());
+
+ // Remove non-existent value.
+ vaultManager.remove(key);
+
+ assertNull(vaultManager.get(key).get().value());
+
+ vaultManager.put(key, val);
+
+ Entry v = vaultManager.get(key).get();
+
+ assertFalse(v.empty());
+ assertEquals(val, v.value());
+
+ // Remove existent value.
+ vaultManager.remove(key);
+
+ v = vaultManager.get(key).get();
+
+ assertNull(v.value());
+ }
+
+ /**
+ * range contract.
+ */
+ @Test
+ public void range() throws ExecutionException, InterruptedException {
+ ByteArray key;
+
+ Map<ByteArray, byte[]> values = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ key = getKey(i);
+
+ values.put(key, getValue(key, i));
+
+ assertNull(vaultManager.get(key).get().value());
+ }
+
+ values.forEach((k, v) -> vaultManager.put(k, v));
+
+ for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+ assertEquals(entry.getValue(), vaultManager.get(entry.getKey()).get().value());
+
+ Iterator<Entry> it = vaultManager.range(getKey(3), getKey(7));
+
+ List<Entry> rangeRes = new ArrayList<>();
+
+ it.forEachRemaining(rangeRes::add);
+
+ assertEquals(4, rangeRes.size());
+
+ //Check that we have exact range from "key3" to "key6"
+ for (int i = 3; i < 7; i++)
+ assertArrayEquals(values.get(getKey(i)), rangeRes.get(i - 3).value());
+ }
+
+ /**
+ * watch contract.
+ */
+ @Test
+ public void watch() throws ExecutionException, InterruptedException {
+ ByteArray key;
+
+ Map<ByteArray, byte[]> values = new HashMap<>();
+
+ for (int i = 0; i < 10; i++) {
+ key = getKey(i);
+
+ values.put(key, getValue(key, i));
+ }
+
+ values.forEach((k, v) -> vaultManager.put(k, v));
+
+ for (Map.Entry<ByteArray, byte[]> entry : values.entrySet())
+ assertEquals(entry.getValue(), vaultManager.get(entry.getKey()).get().value());
+
+ CountDownLatch counter = new CountDownLatch(4);
+
+ VaultWatch vaultWatch = new VaultWatch(getKey(3), getKey(7), new VaultListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<Entry> entries) {
+ counter.countDown();
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ // no-op
+ }
+ });
+
+ vaultManager.watch(vaultWatch);
+
+ for (int i = 3; i < 7; i++)
+ vaultManager.put(getKey(i), ("new" + i).getBytes());
+
+ assertTrue(counter.await(10, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * putAll contract.
+ */
+ @Test
+ public void putAllAndRevision() throws ExecutionException, InterruptedException, IgniteInternalCheckedException {
+ Map<ByteArray, byte[]> entries = new HashMap<>();
+
+ int entriesNum = 100;
+
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
+
+ entries.put(key, getValue(key, i));
+ }
+
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
+
+ assertNull(vaultManager.get(key).get().value());
+ }
+
+ vaultManager.putAll(entries, 1L);
+
+ for (int i = 0; i < entriesNum; i++) {
+ ByteArray key = getKey(i);
+
+ assertEquals(entries.get(key), vaultManager.get(key).get().value());
+ }
+
+ assertEquals(1L, vaultManager.appliedRevision());
+ }
+
+ /**
+ * Creates key for vault entry.
+ */
+ private static ByteArray getKey(int k) {
+ return ByteArray.fromString("key" + k);
+ }
+
+ /**
+ * Creates value represented by byte array.
+ */
+ private static byte[] getValue(ByteArray k, int v) {
+ return ("key" + k + '_' + "val" + v).getBytes();
+ }
+}