You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/22 10:10:08 UTC
[ignite-3] branch main updated: IGNITE-14982 RocksDB-based
persistent key-value storage for meta store (#203)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 179d24d IGNITE-14982 RocksDB-based persistent key-value storage for meta store (#203)
179d24d is described below
commit 179d24de331d6b2d88846daedb779672b4667627
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Jul 22 13:10:02 2021 +0300
IGNITE-14982 RocksDB-based persistent key-value storage for meta store (#203)
---
.../apache/ignite/internal/util/ArrayUtils.java | 2 +-
.../ignite/internal/util/CollectionUtils.java | 2 +-
.../apache/ignite/internal/util/IgniteUtils.java | 22 +-
.../testframework/WorkDirectoryExtension.java | 9 +-
modules/metastorage-client/pom.xml | 7 +
.../ITMetaStorageServicePersistenceTest.java | 426 ++++
.../client/ITMetaStorageServiceTest.java | 29 +-
modules/metastorage-server/pom.xml | 12 +
.../metastorage/server/KeyValueStorage.java | 23 +-
.../server/SimpleInMemoryKeyValueStorage.java | 21 +-
.../ignite/internal/metastorage/server/Value.java | 2 +-
.../server/persistence/ColumnFamily.java | 158 ++
.../server/persistence/RangeCursor.java | 184 ++
.../server/persistence/RocksDBKeyValueStorage.java | 1092 +++++++++++
.../server/persistence/RocksStorageUtils.java | 267 +++
.../persistence/StorageColumnFamilyType.java | 53 +
.../server/persistence/WatchCursor.java | 233 +++
.../server/raft/MetaStorageListener.java | 32 +-
...eTest.java => AbstractKeyValueStorageTest.java} | 164 +-
.../server/RocksDbKeyValueStorageTest.java | 39 +
.../server/SimpleInMemoryKeyValueStorageTest.java | 2032 +-------------------
.../raft/client/service/RaftGroupListener.java | 10 +-
.../apache/ignite/raft/server/CounterListener.java | 13 +-
.../raft/server/ITJRaftCounterServerTest.java | 4 +-
.../internal/raft/server/impl/JRaftServerImpl.java | 10 +-
.../table/distributed/raft/PartitionListener.java | 10 +-
26 files changed, 2724 insertions(+), 2132 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index 1f2e9f3..a33cf5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.util;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.Nullable;
@@ -272,6 +271,7 @@ public final class ArrayUtils {
*
* @param arr Array.
* @param obj One or more elements.
+ * @param <T> Type of the elements of the array.
* @return Concatenated array.
*/
@SafeVarargs
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 4273ba4..21a7899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.util;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-
import org.jetbrains.annotations.Nullable;
/**
@@ -61,6 +60,7 @@ public final class CollectionUtils {
* Gets first element from given list or returns {@code null} if list is empty.
*
* @param list List to retrieve the first element.
+ * @param <T> Type of the elements of the list.
* @return The first element of the given list or {@code null} in case the list is empty.
*/
public static <T> T first(List<? extends T> list) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8e8af1a..f5ea3c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -24,6 +24,7 @@ import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -371,6 +372,9 @@ public class IgniteUtils {
try {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ if (exc != null)
+ throw exc;
+
Files.delete(dir);
return FileVisitResult.CONTINUE;
@@ -449,14 +453,16 @@ public class IgniteUtils {
* thrown exception will be propagated to the caller, after all other objects are closed, similar to
* the try-with-resources block.
*
- * @param closeables collection of objects to close
+ * @param closeables Collection of objects to close.
+ * @throws Exception If failed to close.
*/
public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
Exception ex = null;
for (AutoCloseable closeable : closeables) {
try {
- closeable.close();
+ if (closeable != null)
+ closeable.close();
}
catch (Exception e) {
if (ex == null)
@@ -469,4 +475,16 @@ public class IgniteUtils {
if (ex != null)
throw ex;
}
+
+ /**
+ * Closes all provided objects.
+ *
+ * @param closeables Array of closeable objects to close.
+ * @throws Exception If failed to close.
+ *
+ * @see #closeAll(Collection)
+ */
+ public static void closeAll(AutoCloseable... closeables) throws Exception {
+ closeAll(Arrays.asList(closeables));
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
index 43b44b8..54791a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
@@ -41,7 +41,8 @@ import org.junit.platform.commons.support.HierarchyTraversalMode;
* {@link WorkDirectory} annotation.
* <p>
* A new temporary folder is created for every test method and will be located relative to the module,
- * where the tests are being run, by the following path: "target/work/{@literal <name-of-the-test-method>}".
+ * where the tests are being run, by the following path:
+ * "target/work/{@literal <name-of-the-test-class>/<name-of-the-test-method>_<current_time_millis>}".
* It is removed after a test has finished running, but this behaviour can be controlled by setting the
* {@link WorkDirectoryExtension#KEEP_WORK_DIR_PROPERTY} property to {@code true}, in which case the created folder can
* be kept intact for debugging purposes.
@@ -110,7 +111,11 @@ public class WorkDirectoryExtension implements BeforeEachCallback, AfterEachCall
if (shouldRemoveDir())
IgniteUtils.deleteIfExists(BASE_PATH);
- Path workDir = BASE_PATH.resolve(extensionContext.getRequiredTestMethod().getName());
+ String testClassDir = extensionContext.getRequiredTestClass().getSimpleName();
+
+ String testMethodDir = extensionContext.getRequiredTestMethod().getName() + '_' + System.currentTimeMillis();
+
+ Path workDir = BASE_PATH.resolve(testClassDir).resolve(testMethodDir);
Files.createDirectories(workDir);
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index ffb6ef1..0041b99 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -86,5 +86,12 @@
<artifactId>ignite-metastorage-server</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
new file mode 100644
index 0000000..3783ccc
--- /dev/null
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.client;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static java.lang.Thread.sleep;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Persistent (rocksdb-based) meta storage client tests.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ITMetaStorageServicePersistenceTest {
+ /** Starting server port. */
+ private static final int PORT = 5003;
+
+ /** Starting client port. */
+ private static final int CLIENT_PORT = 6003;
+
+ /**
+ * Peers list.
+ */
+ private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+ .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+ .map(Peer::new)
+ .collect(Collectors.toUnmodifiableList());
+
+ /** */
+ private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
+
+ /** Factory. */
+ private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+
+ /** Network factory. */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ /** */
+ @WorkDirectory
+ private Path workDir;
+
+ /** Cluster. */
+ private final List<ClusterService> cluster = new ArrayList<>();
+
+ /** Servers. */
+ private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+ /** Clients. */
+ private final List<RaftGroupService> clients = new ArrayList<>();
+
+ /**
+ * Shutdown raft server and stop all cluster nodes.
+ *
+ * @throws Exception If failed to shutdown raft server,
+ */
+ @AfterEach
+ public void afterTest() throws Exception {
+ for (RaftGroupService client : clients)
+ client.shutdown();
+
+ for (JRaftServerImpl server : servers)
+ server.shutdown();
+ }
+
+ /**
+ * Test parameters for {@link #testSnapshot}.
+ */
+ private static class TestData {
+ /** Delete raft group folder. */
+ private final boolean deleteFolder;
+
+ /** Write to meta storage after a snapshot. */
+ private final boolean writeAfterSnapshot;
+
+ /** */
+ private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
+ this.deleteFolder = deleteFolder;
+ this.writeAfterSnapshot = writeAfterSnapshot;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
+ }
+ }
+
+ /**
+ * @return {@link #testSnapshot} parameters.
+ */
+ private static List<TestData> testSnapshotData() {
+ return List.of(
+ new TestData(false, false),
+ new TestData(true, true),
+ new TestData(false, true),
+ new TestData(true, false)
+ );
+ }
+
+ /**
+ * Tests that a joining raft node successfully restores a snapshot.
+ *
+ * @param testData Test parameters.
+ * @throws Exception If failed.
+ */
+ @ParameterizedTest
+ @MethodSource("testSnapshotData")
+ public void testSnapshot(TestData testData) throws Exception {
+ ByteArray firstKey = ByteArray.fromString("first");
+ byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
+
+ // Setup a metastorage raft service
+ RaftGroupService metaStorageSvc = prepareMetaStorage();
+
+ MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc);
+
+ // Put some data in the metastorage
+ metaStorage.put(firstKey, firstValue).get();
+
+ // Check that data has been written successfully
+ check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
+
+ // Select any node that is not the leader of the group
+ JRaftServerImpl toStop = servers.stream()
+ .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
+ .findAny()
+ .orElseThrow();
+
+ // Get the path to that node's raft directory
+ String serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
+
+ // Get the path to that node's RocksDB key-value storage
+ Path dbPath = getStorage(toStop).getDbPath();
+
+ int stopIdx = servers.indexOf(toStop);
+
+ // Remove that node from the list of servers
+ servers.remove(stopIdx);
+
+ // Shutdown that node
+ toStop.shutdown();
+
+ // Create a raft snapshot of the metastorage service
+ metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+
+ // Remove the first key from the metastorage
+ metaStorage.remove(firstKey).get();
+
+ // Check that data has been removed
+ check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
+
+ // Put same data again
+ metaStorage.put(firstKey, firstValue).get();
+
+ // Check that it has been written
+ check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
+
+ // Create another raft snapshot
+ metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+
+ byte[] lastKey = firstKey.bytes();
+ byte[] lastValue = firstValue;
+
+ if (testData.deleteFolder) {
+ // Delete stopped node's raft directory and key-value storage directory
+ // to check if snapshot could be restored by the restarted node
+ IgniteUtils.deleteIfExists(dbPath);
+ IgniteUtils.deleteIfExists(Paths.get(serverDataPath));
+ }
+
+ if (testData.writeAfterSnapshot) {
+ // Put new data after the second snapshot to check if after
+ // the snapshot restore operation restarted node will receive it
+ ByteArray secondKey = ByteArray.fromString("second");
+ byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
+
+ metaStorage.put(secondKey, secondValue).get();
+
+ lastKey = secondKey.bytes();
+ lastValue = secondValue;
+ }
+
+ // Restart the node
+ JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+
+ assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+ KeyValueStorage storage = getStorage(restarted);
+
+ byte[] finalLastKey = lastKey;
+
+ int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
+ int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
+
+ EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
+
+ // Wait until the snapshot is restored
+ boolean success = waitForCondition(() -> {
+ org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
+ return e.empty() == expectedLastEntry.empty()
+ && e.tombstone() == expectedLastEntry.tombstone()
+ && e.revision() == expectedLastEntry.revision()
+ && e.updateCounter() == expectedLastEntry.revision()
+ && Arrays.equals(e.key(), expectedLastEntry.key().bytes())
+ && Arrays.equals(e.value(), expectedLastEntry.value());
+ }, 3_000);
+
+ assertTrue(success);
+
+ // Check that the last value has been written successfully
+ check(metaStorage, expectedLastEntry);
+ }
+
+ /**
+ * Get the meta store's key-value storage of the jraft server.
+ *
+ * @param server Server.
+ * @return Meta store's key value storage.
+ */
+ private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
+ org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
+
+ DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+
+ MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+
+ KeyValueStorage storage = listener.getStorage();
+
+ return (RocksDBKeyValueStorage) storage;
+ }
+
+ /**
+ * Check meta storage entry.
+ *
+ * @param metaStorage Meta storage service.
+ * @param expected Expected entry.
+ * @throws ExecutionException If failed.
+ * @throws InterruptedException If failed.
+ */
+ private void check(MetaStorageServiceImpl metaStorage, EntryImpl expected)
+ throws ExecutionException, InterruptedException {
+ Entry entry = metaStorage.get(expected.key()).get();
+
+ assertEquals(expected, entry);
+ }
+
+ /** */
+ @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
+ long stop = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < stop) {
+ if (cond.getAsBoolean())
+ return true;
+
+ try {
+ sleep(50);
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @param cluster The cluster.
+ * @param exp Expected count.
+ * @param timeout The timeout in millis.
+ * @return {@code True} if topology size is equal to expected.
+ */
+ private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
+ return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
+ }
+
+ /**
+ * @return Local address.
+ */
+ private static String getLocalAddress() {
+ try {
+ return InetAddress.getLocalHost().getHostAddress();
+ }
+ catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates a cluster service.
+ */
+ private ClusterService clusterService(String name, int port, List<NetworkAddress> servers) {
+ var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+
+ var network = NETWORK_FACTORY.createClusterService(context);
+
+ network.start();
+
+ cluster.add(network);
+
+ return network;
+ }
+
+ /**
+ * Starts a raft server.
+ *
+ * @param idx Server index (affects port of the server).
+ * @param storage KeyValueStorage for the MetaStorage.
+ * @return Server.
+ */
+ private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
+ var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+ ClusterService service = clusterService("server" + idx, PORT + idx, List.of(addr));
+
+ Path jraft = workDir.resolve("jraft" + idx);
+
+ JRaftServerImpl server = new JRaftServerImpl(service, jraft.toString()) {
+ @Override public void shutdown() throws Exception {
+ super.shutdown();
+
+ service.shutdown();
+ }
+ };
+
+ server.startRaftGroup(
+ METASTORAGE_RAFT_GROUP_NAME,
+ new MetaStorageListener(storage),
+ INITIAL_CONF
+ );
+
+ servers.add(server);
+
+ return server;
+ }
+
+ /**
+ * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
+ * a client.
+ *
+ * @return Meta storage raft group service instance.
+ */
+ private RaftGroupService prepareMetaStorage() throws IOException {
+ for (int i = 0; i < INITIAL_CONF.size(); i++)
+ startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+
+ assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+ return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
+ }
+
+ /**
+ * Starts a client with a specific address.
+ */
+ private RaftGroupService startClient(String groupId, NetworkAddress addr) {
+ ClusterService clientNode = clusterService(
+ "client_" + groupId + "_", CLIENT_PORT + clients.size(), List.of(addr));
+
+ RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
+ List.of(new Peer(addr)), false, 200) {
+ @Override public void shutdown() {
+ super.shutdown();
+
+ clientNode.shutdown();
+ }
+ };
+
+ clients.add(client);
+
+ return client;
+ }
+}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 78a6a41..19a95a1 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.client;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -25,6 +26,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -32,6 +34,8 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
@@ -47,9 +51,8 @@ import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -818,7 +821,7 @@ public class ITMetaStorageServiceTest {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
return new Cursor<>() {
private final Iterator<org.apache.ignite.internal.metastorage.server.WatchEvent> it = new Iterator<>() {
@Override public boolean hasNext() {
@@ -1145,7 +1148,7 @@ public class ITMetaStorageServiceTest {
}
/** {@inheritDoc} */
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
fail();
return null;
@@ -1169,5 +1172,23 @@ public class ITMetaStorageServiceTest {
@Override public void compact() {
fail();
}
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ fail();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ fail();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path snapshotPath) {
+ fail();
+ }
}
}
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index 5f0f453..07bb1df 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -44,6 +44,11 @@
</dependency>
<dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</dependency>
@@ -60,5 +65,12 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index d4c7da5..9250d7d 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -17,15 +17,18 @@
package org.apache.ignite.internal.metastorage.server;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Defines key/value storage interface.
*/
-public interface KeyValueStorage {
+public interface KeyValueStorage extends AutoCloseable {
/**
* Returns storage revision.
*
@@ -177,7 +180,7 @@ public interface KeyValueStorage {
* @param rev Start revision number.
* @return Cursor by update events.
*/
- Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
+ Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev);
/**
* Creates subscription on updates of entries corresponding to the given keys range (where upper bound is unlimited)
@@ -203,4 +206,20 @@ public interface KeyValueStorage {
* Compacts storage (removes tombstones).
*/
void compact();
+
+ /**
+ * Creates a snapshot of the storage's current state in the specified directory.
+ *
+ * @param snapshotPath Directory to store a snapshot.
+ * @return Future representing pending completion of the operation. Could not be {@code null}.
+ */
+ @NotNull
+ CompletableFuture<Void> snapshot(Path snapshotPath);
+
+ /**
+ * Restores a state of the storage which was previously captured with a {@link #snapshot(Path)}.
+ *
+ * @param snapshotPath Path to the snapshot's directory.
+ */
+ void restoreSnapshot(Path snapshotPath);
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index bab95a8..26efb72 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.server;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -28,9 +29,11 @@ import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
@@ -284,7 +287,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
/** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+ @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
assert keyFrom != null : "keyFrom couldn't be null.";
assert rev > 0 : "rev must be positive.";
@@ -328,6 +331,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException();
+ }
+
/** */
private boolean doRemove(byte[] key, long curRev) {
Entry e = doGet(key, LATEST_REV, false);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
index ce79b17..e7dff54 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -66,7 +66,7 @@ public class Value {
*
* @return {@code True} if value is tombstone, otherwise - {@code false}.
*/
- boolean tombstone() {
+ public boolean tombstone() {
return bytes == TOMBSTONE;
}
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
new file mode 100644
index 0000000..4052aeb
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.List;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * Wrapper for the column family that encapsulates {@link ColumnFamilyHandle} and RocksDB's operations with it.
+ */
+class ColumnFamily implements AutoCloseable {
+ /** RocksDB instance. */
+ private final RocksDB db;
+
+ /** Column family type. */
+ private final StorageColumnFamilyType cfType;
+
+ /** Column family handle. */
+ private final ColumnFamilyHandle cfHandle;
+
+ /** Column family options. */
+ private final ColumnFamilyOptions cfOptions;
+
+ /** Options for the column family options. */
+ private final Options options;
+
+ /**
+ * Constructor.
+ *
+ * @param db Db.
+ * @param handle Column family handle.
+ * @param cfType Column family type.
+ * @param cfOptions Column family options.
+ * @param options Options for the column family options.
+ * @throws RocksDBException If failed.
+ */
+ ColumnFamily(
+ RocksDB db,
+ ColumnFamilyHandle handle,
+ StorageColumnFamilyType cfType,
+ ColumnFamilyOptions cfOptions,
+ Options options
+ ) {
+ this.db = db;
+ this.cfType = cfType;
+ this.cfOptions = cfOptions;
+ this.options = options;
+ this.cfHandle = handle;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ IgniteUtils.closeAll(cfHandle, cfOptions, options);
+ }
+
+ /**
+ * Gets the value associated with the key from this column family.
+ *
+ * @param key Key.
+ * @return Value.
+ * @throws RocksDBException If failed.
+ * @see RocksDB#get(ColumnFamilyHandle, byte[])
+ */
+ byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
+ return db.get(cfHandle, key);
+ }
+
+ /**
+ * Puts a key-value pair into this column family within the write batch.
+ *
+ * @param batch Write batch.
+ * @param key Key.
+ * @param value Value.
+ * @throws RocksDBException If failed.
+ * @see WriteBatch#put(ColumnFamilyHandle, byte[], byte[])
+ */
+ void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+ batch.put(cfHandle, key, value);
+ }
+
+ /**
+ * Deletes the entry mapped by the key and associated with this column family within the write batch.
+ *
+ * @param batch Write batch.
+ * @param key Key.
+ * @throws RocksDBException If failed.
+ * @see WriteBatch#delete(ColumnFamilyHandle, byte[])
+ */
+ void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
+ batch.delete(cfHandle, key);
+ }
+
+ /**
+ * Creates a new iterator over this column family.
+ *
+ * @return Iterator.
+ * @see RocksDB#newIterator(ColumnFamilyHandle)
+ */
+ RocksIterator newIterator() {
+ return db.newIterator(cfHandle);
+ }
+
+ /**
+ * Creates a new iterator with given read options over this column family.
+ *
+ * @param options Read options.
+ * @return Iterator.
+ * @see RocksDB#newIterator(ColumnFamilyHandle, ReadOptions)
+ */
+ RocksIterator newIterator(ReadOptions options) {
+ return db.newIterator(cfHandle, options);
+ }
+
+ /**
+ * Ingests external files into this column family.
+ *
+ * @param paths Paths to the external files.
+ * @param options Ingestion options.
+ * @throws RocksDBException If failed.
+ * @see RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)
+ */
+ void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
+ db.ingestExternalFile(cfHandle, paths, options);
+ }
+
+ /**
+ * @return Name of the column family.
+ */
+ String name() {
+ return cfType.name();
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java
new file mode 100644
index 0000000..826268e
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor by entries which correspond to the given keys range.
+ */
+class RangeCursor implements Cursor<Entry> {
+ /** Storage. */
+ private final RocksDBKeyValueStorage storage;
+
+ /** Lower iteration bound (included). */
+ private final byte[] keyFrom;
+
+ /** Upper iteration bound (excluded). */
+ private final byte @Nullable [] keyTo;
+
+ /** Revision upper bound (included). */
+ private final long rev;
+
+ /** Iterator. */
+ private final Iterator<Entry> it;
+
+ /** Next entry. */
+ @Nullable
+ private Entry nextRetEntry;
+
+ /** Key of the last returned entry. */
+ private byte[] lastRetKey;
+
+ /**
+ * {@code true} if the iteration is finished.
+ */
+ private boolean finished;
+
+ /**
+ * Constructor.
+ *
+ * @param storage Storage.
+ * @param keyFrom {@link #keyFrom}.
+ * @param keyTo {@link #keyTo}.
+ * @param rev {@link #rev}.
+ */
+ RangeCursor(RocksDBKeyValueStorage storage, byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
+ this.storage = storage;
+ this.keyFrom = keyFrom;
+ this.keyTo = keyTo;
+ this.rev = rev;
+ this.it = createIterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Entry next() {
+ return it.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Iterator<Entry> iterator() {
+ return it;
+ }
+
+ /**
+ * Creates an iterator for this cursor.
+ *
+ * @return Iterator.
+ */
+ @NotNull
+ private Iterator<Entry> createIterator() {
+ return new Iterator<>() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ storage.lock().readLock().lock();
+
+ try {
+ while (true) {
+ if (finished)
+ return false;
+
+ if (nextRetEntry != null)
+ return true;
+
+ byte[] key = lastRetKey;
+
+ while (nextRetEntry == null) {
+ Map.Entry<byte[], long[]> e =
+ key == null ? storage.revisionCeilingEntry(keyFrom) : storage.revisionHigherEntry(key);
+
+ if (e == null) {
+ finished = true;
+
+ break;
+ }
+
+ key = e.getKey();
+
+ if (keyTo != null && RocksDBKeyValueStorage.CMP.compare(key, keyTo) >= 0) {
+ finished = true;
+
+ break;
+ }
+
+ long[] revs = e.getValue();
+
+ assert revs != null && revs.length != 0 :
+ "Revisions should not be empty or null: [revs=" + Arrays.toString(revs) + ']';
+
+ long lastRev = RocksDBKeyValueStorage.maxRevision(revs, rev);
+
+ if (lastRev == -1)
+ continue;
+
+ Entry entry = storage.doGetValue(key, lastRev);
+
+ assert !entry.empty() : "Iterator should not return empty entry.";
+
+ nextRetEntry = entry;
+ }
+ }
+ }
+ finally {
+ storage.lock().readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Entry next() {
+ storage.lock().readLock().lock();
+
+ try {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Entry e = nextRetEntry;
+
+ nextRetEntry = null;
+
+ assert e != null;
+
+ lastRetKey = e.key();
+
+ return e;
+ }
+ finally {
+ storage.lock().readLock().unlock();
+ }
+ }
+ };
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
new file mode 100644
index 0000000..507307d
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
@@ -0,0 +1,1092 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.find;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.forEach;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
+import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+
+/**
+ * Key-value storage based on RocksDB.
+ * Keys are stored with revision.
+ * Values are stored in the default column family with an update counter and a boolean flag which represents
+ * whether this record is a tombstone.
+ * <br>
+ * Key: [8 bytes revision, N bytes key itself].
+ * <br>
+ * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value].
+ * <br>
+ * The mapping from the key to the set of the storage's revisions is stored in the "index" column family.
+ * A key represents the key of an entry and the value is a {@code byte[]} that represents a {@code long[]} where every
+ * item is a revision of the storage.
+ */
+public class RocksDBKeyValueStorage implements KeyValueStorage {
+ /** Suffix for the temporary snapshot folder */
+ private static final String TMP_SUFFIX = ".tmp";
+
+ /** A revision to store with system entries. */
+ private static final long SYSTEM_REVISION_MARKER_VALUE = -1;
+
+ /** Revision key. */
+ private static final byte[] REVISION_KEY = keyToRocksKey(
+ SYSTEM_REVISION_MARKER_VALUE,
+ "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8)
+ );
+
+ /** Update counter key. */
+ private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
+ SYSTEM_REVISION_MARKER_VALUE,
+ "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8)
+ );
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** RockDB options. */
+ private final DBOptions options;
+
+ /** RocksDb instance. */
+ private final RocksDB db;
+
+ /** Data column family. */
+ private final ColumnFamily data;
+
+ /** Index column family. */
+ private final ColumnFamily index;
+
+ /** RW lock. */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ /** Thread-pool for snapshot operations execution. */
+ private final ExecutorService snapshotExecutor = Executors.newFixedThreadPool(2);
+
+ /**
+ * Special value for the revision number which means that operation should be applied
+ * to the latest revision of an entry.
+ */
+ private static final long LATEST_REV = -1;
+
+ /** Lexicographic order comparator. */
+ static final Comparator<byte[]> CMP = Arrays::compare;
+
+ /** Path to the rocksdb database. */
+ private final Path dbPath;
+
+ /** Revision. Will be incremented for each single-entry or multi-entry update operation. */
+ private long rev;
+
+ /** Update counter. Will be incremented for each update of any particular entry. */
+ private long updCntr;
+
+ /**
+ * Constructor.
+ *
+ * @param dbPath RocksDB path.
+ */
+ public RocksDBKeyValueStorage(Path dbPath) {
+ try {
+ options = new DBOptions()
+ .setCreateMissingColumnFamilies(true)
+ .setCreateIfMissing(true);
+
+ this.dbPath = dbPath;
+
+ Options dataOptions = new Options().setCreateIfMissing(true)
+ // The prefix is the revision of an entry, so prefix length is the size of a long
+ .useFixedLengthPrefixExtractor(Long.BYTES);
+
+ ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
+
+ Options indexOptions = new Options().setCreateIfMissing(true);
+
+ ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
+
+ List<ColumnFamilyDescriptor> descriptors = Arrays.asList(
+ new ColumnFamilyDescriptor(DATA.nameAsBytes(), dataFamilyOptions),
+ new ColumnFamilyDescriptor(INDEX.nameAsBytes(), indexFamilyOptions)
+ );
+
+ var handles = new ArrayList<ColumnFamilyHandle>();
+
+ // Delete existing data, relying on the raft's snapshot and log playback
+ destroyRocksDB();
+
+ this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
+
+ data = new ColumnFamily(db, handles.get(0), DATA, dataFamilyOptions, dataOptions);
+
+ index = new ColumnFamily(db, handles.get(1), INDEX, indexFamilyOptions, indexOptions);
+ }
+ catch (Exception e) {
+ try {
+ close();
+ }
+ catch (Exception exception) {
+ e.addSuppressed(exception);
+ }
+
+ throw new IgniteInternalException("Failed to start the storage", e);
+ }
+ }
+
+ /**
+ * Clear the RocksDB instance.
+ * The major difference with directly deleting the DB directory manually is that
+ * destroyDB() will take care of the case where the RocksDB database is stored
+ * in multiple directories. For instance, a single DB can be configured to store
+ * its data in multiple directories by specifying different paths to
+ * DBOptions::db_paths, DBOptions::db_log_dir, and DBOptions::wal_dir.
+ *
+ * @throws RocksDBException If failed.
+ */
+ private void destroyRocksDB() throws RocksDBException {
+ try (Options opt = new Options()) {
+ RocksDB.destroyDB(dbPath.toString(), opt);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
+
+ IgniteUtils.closeAll(data, index, db, options);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+ // Create a RocksDB point-in-time snapshot
+ Snapshot snapshot = db.getSnapshot();
+
+ return CompletableFuture.runAsync(() -> {
+ // (Re)create the temporary directory
+ IgniteUtils.deleteIfExists(tempPath);
+
+ try {
+ Files.createDirectories(tempPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException("Failed to create directory: " + tempPath, e);
+ }
+ }, snapshotExecutor).thenCompose(aVoid ->
+ // Create futures for capturing SST snapshots of the column families
+ CompletableFuture.allOf(createSstFile(data, snapshot, tempPath), createSstFile(index, snapshot, tempPath)))
+ .whenComplete((aVoid, throwable) -> {
+ // Release a snapshot
+ db.releaseSnapshot(snapshot);
+
+ // Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
+ // database does. Calling close to maintain the AutoCloseable semantics
+ snapshot.close();
+
+ if (throwable != null)
+ return;
+
+ // Delete snapshot directory if it already exists
+ IgniteUtils.deleteIfExists(snapshotPath);
+
+ try {
+ // Rename the temporary directory
+ Files.move(tempPath, snapshotPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException("Failed to rename: " + tempPath + " to " + snapshotPath, e);
+ }
+ });
+ }
+
+ /**
+ * Create an SST file for the column family.
+ *
+ * @param columnFamily Column family.
+ * @param snapshot Point-in-time snapshot.
+ * @param path Directory to put the SST file in.
+ * @return Future representing pending completion of the operation.
+ */
+ private CompletableFuture<Void> createSstFile(ColumnFamily columnFamily, Snapshot snapshot, Path path) {
+ return CompletableFuture.runAsync(() -> {
+ try (
+ EnvOptions envOptions = new EnvOptions();
+ Options options = new Options();
+ ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
+ RocksIterator it = columnFamily.newIterator(readOptions);
+ SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
+ ) {
+ Path sstFile = path.resolve(columnFamily.name());
+
+ sstFileWriter.open(sstFile.toString());
+
+ it.seekToFirst();
+
+ forEach(it, sstFileWriter::put);
+
+ sstFileWriter.finish();
+ }
+ catch (Throwable t) {
+ throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
+ }
+ }, snapshotExecutor);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path path) {
+ rwLock.writeLock().lock();
+
+ try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
+ for (ColumnFamily family : Arrays.asList(data, index)) {
+ Path snapshotPath = path.resolve(family.name());
+
+ if (!Files.exists(snapshotPath))
+ throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
+
+ family.ingestExternalFile(Collections.singletonList(snapshotPath.toString()), ingestOptions);
+ }
+
+ rev = bytesToLong(data.get(REVISION_KEY));
+
+ updCntr = bytesToLong(data.get(UPDATE_COUNTER_KEY));
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException("Fail to ingest sst file at path: " + path, e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long revision() {
+ return rev;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long updateCounter() {
+ return updCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(byte[] key, byte[] value) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+
+ long cntr = updCntr + 1;
+
+ addDataToBatch(batch, key, value, curRev, cntr);
+
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, cntr);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Adds a revision to the keys index.
+ *
+ * @param batch Write batch.
+ * @param key Key.
+ * @param curRev New revision for key.
+ */
+ private void updateKeysIndex(WriteBatch batch, byte[] key, long curRev) {
+ try {
+ // Get the revisions current value
+ byte @Nullable [] array = index.get(key);
+
+ // Store the new value
+ index.put(batch, key, appendLong(array, curRev));
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /**
+ * Fills the batch with system values (the update counter and the revision) and writes it to the db.
+ *
+ * @param batch Write batch.
+ * @param newRev New revision.
+ * @param newCntr New update counter.
+ * @throws RocksDBException If failed.
+ */
+ private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr) throws RocksDBException {
+ try (WriteOptions opts = new WriteOptions()) {
+ data.put(batch, UPDATE_COUNTER_KEY, longToBytes(newCntr));
+ data.put(batch, REVISION_KEY, longToBytes(newRev));
+
+ db.write(opts, batch);
+
+ rev = newRev;
+ updCntr = newCntr;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry getAndPut(byte[] key, byte[] value) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+ long cntr = updCntr + 1;
+
+ long[] revs = getRevisions(key);
+
+ long lastRev = revs.length == 0 ? 0 : lastRevision(revs);
+
+ addDataToBatch(batch, key, value, curRev, cntr);
+
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, cntr);
+
+ // Return previous value.
+ return doGetValue(key, lastRev);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+
+ long counter = addAllToBatch(batch, keys, values, curRev);
+
+ for (byte[] key : keys)
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+ Collection<Entry> res;
+
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+
+ res = doGetAll(keys, curRev);
+
+ long counter = addAllToBatch(batch, keys, values, curRev);
+
+ for (byte[] key : keys)
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry get(byte[] key) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGet(key, LATEST_REV, false);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry get(byte[] key, long rev) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGet(key, rev, true);
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys) {
+ return doGetAll(keys, LATEST_REV);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+ return doGetAll(keys, revUpperBound);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(byte[] key) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+ long counter = updCntr + 1;
+
+ if (addToBatchForRemoval(batch, key, curRev, counter)) {
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry getAndRemove(byte[] key) {
+ rwLock.writeLock().lock();
+
+ try {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return e;
+
+ return getAndPut(key, TOMBSTONE);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll(List<byte[]> keys) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ long counter = updCntr;
+
+ for (byte[] key : keys) {
+ if (addToBatchForRemoval(batch, key, curRev, counter + 1)) {
+ existingKeys.add(key);
+
+ counter++;
+ }
+ }
+
+ for (byte[] key : existingKeys)
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+ Collection<Entry> res = new ArrayList<>(keys.size());
+
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ List<byte[]> vals = new ArrayList<>(keys.size());
+
+ for (byte[] key : keys) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ res.add(e);
+
+ if (e.empty() || e.tombstone())
+ continue;
+
+ existingKeys.add(key);
+
+ vals.add(TOMBSTONE);
+ }
+
+ long counter = addAllToBatch(batch, existingKeys, vals, curRev);
+
+ for (byte[] key : existingKeys)
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ Entry e = get(condition.key());
+
+ boolean branch = condition.test(e);
+
+ Collection<Operation> ops = branch ? success : failure;
+
+ long curRev = rev + 1;
+
+ boolean modified = false;
+
+ long counter = updCntr;
+
+ List<byte[]> updatedKeys = new ArrayList<>();
+
+ for (Operation op : ops) {
+ byte[] key = op.key();
+
+ switch (op.type()) {
+ case PUT:
+ counter++;
+
+ addDataToBatch(batch, key, op.value(), curRev, counter);
+
+ updatedKeys.add(key);
+
+ modified = true;
+
+ break;
+
+ case REMOVE:
+ counter++;
+
+ boolean removed = addToBatchForRemoval(batch, key, curRev, counter);
+
+ if (!removed)
+ counter--;
+ else
+ updatedKeys.add(key);
+
+ modified |= removed;
+
+ break;
+
+ case NO_OP:
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown operation type: " + op.type());
+ }
+ }
+
+ if (modified) {
+ for (byte[] key : updatedKeys)
+ updateKeysIndex(batch, key, curRev);
+
+ fillAndWriteBatch(batch, curRev, counter);
+ }
+
+ return branch;
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+ return new RangeCursor(this, keyFrom, keyTo, rev);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ return new RangeCursor(this, keyFrom, keyTo, revUpperBound);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
+ assert keyFrom != null : "keyFrom couldn't be null.";
+ assert rev > 0 : "rev must be positive.";
+
+ return new WatchCursor(this, rev, k ->
+ CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+ assert key != null : "key couldn't be null.";
+ assert rev > 0 : "rev must be positive.";
+
+ return new WatchCursor(this, rev, k -> CMP.compare(k, key) == 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
+ assert keys != null && !keys.isEmpty() : "keys couldn't be null or empty: " + keys;
+ assert rev > 0 : "rev must be positive.";
+
+ TreeSet<byte[]> keySet = new TreeSet<>(CMP);
+
+ keySet.addAll(keys);
+
+ return new WatchCursor(this, rev, keySet::contains);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void compact() {
+ rwLock.writeLock().lock();
+
+ try (WriteBatch batch = new WriteBatch()) {
+ try (RocksIterator iterator = index.newIterator()) {
+ iterator.seekToFirst();
+
+ forEach(iterator, (key, value) -> compactForKey(batch, key, getAsLongs(value)));
+ }
+
+ fillAndWriteBatch(batch, rev, updCntr);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Adds a key to a batch marking the value as a tombstone.
+ *
+ * @param batch Write batch.
+ * @param key Target key.
+ * @param curRev Revision.
+ * @param counter Update counter.
+ * @return {@code true} if an entry can be deleted.
+ * @throws RocksDBException If failed.
+ */
+ private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev, long counter) throws RocksDBException {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return false;
+
+ addDataToBatch(batch, key, TOMBSTONE, curRev, counter);
+
+ return true;
+ }
+
+ /**
+ * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is
+ * a tombstone.
+ *
+ * @param batch Write batch.
+ * @param key Target key.
+ * @param revs Revisions.
+ * @throws RocksDBException If failed.
+ */
+ private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+ long lastRev = lastRevision(revs);
+
+ for (int i = 0; i < revs.length - 1; i++)
+ data.delete(batch, keyToRocksKey(revs[i], key));
+
+ byte[] rocksKey = keyToRocksKey(lastRev, key);
+
+ Value value = bytesToValue(data.get(rocksKey));
+
+ if (value.tombstone()) {
+ index.delete(batch, rocksKey);
+
+ index.delete(batch, key);
+ }
+ else
+ index.put(batch, key, longToBytes(lastRev));
+ }
+
+ /**
+ * Gets all entries with given keys and a revision.
+ *
+ * @param keys Target keys.
+ * @param rev Target revision.
+ * @return Collection of entries.
+ */
+ @NotNull
+ private Collection<Entry> doGetAll(Collection<byte[]> keys, long rev) {
+ assert keys != null : "keys list can't be null.";
+ assert !keys.isEmpty() : "keys list can't be empty.";
+ assert rev > 0 || rev == LATEST_REV : "Revision must be positive or " + LATEST_REV + '.';
+
+ Collection<Entry> res = new ArrayList<>(keys.size());
+
+ rwLock.readLock().lock();
+
+ try {
+ for (byte[] key : keys)
+ res.add(doGet(key, rev, false));
+ }
+ finally {
+ rwLock.readLock().unlock();
+ }
+
+ return res;
+ }
+
+ /**
+ * Gets the value by key and revision.
+ *
+ * @param key Target key.
+ * @param rev Target revision.
+ * @param exactRev {@code true} if searching for exact revision, {@code false} if rev is an upper bound (inclusive).
+ * @return Value.
+ */
+ @NotNull
+ Entry doGet(byte[] key, long rev, boolean exactRev) {
+ assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
+ "Invalid arguments: [rev=" + rev + ", exactRev=" + exactRev + ']';
+
+ long[] revs;
+ try {
+ revs = getRevisions(key);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ if (revs == null || revs.length == 0)
+ return Entry.empty(key);
+
+ long lastRev;
+
+ if (rev == LATEST_REV)
+ lastRev = lastRevision(revs);
+ else
+ lastRev = exactRev ? rev : maxRevision(revs, rev);
+
+ // lastRev can be -1 if maxRevision return -1.
+ if (lastRev == -1)
+ return Entry.empty(key);
+
+ return doGetValue(key, lastRev);
+ }
+
+ /**
+ * Get a list of the revisions of the entry corresponding to the key.
+ *
+ * @param key Key.
+ * @return List of the revisions.
+ * @throws RocksDBException If failed to perform {@link RocksDB#get(ColumnFamilyHandle, byte[])}.
+ */
+ private long[] getRevisions(byte[] key) throws RocksDBException {
+ byte[] revisions = index.get(key);
+
+ if (revisions == null)
+ return LONG_EMPTY_ARRAY;
+
+ return getAsLongs(revisions);
+ }
+
+ /**
+ * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then
+ * {@code -1} will be returned.
+ *
+ * @param revs Revisions list.
+ * @param upperBoundRev Revision upper bound.
+ * @return Maximum revision or {@code -1} if there is no such revision.
+ */
+ static long maxRevision(long[] revs, long upperBoundRev) {
+ for (int i = revs.length - 1; i >= 0; i--) {
+ long rev = revs[i];
+
+ if (rev <= upperBoundRev)
+ return rev;
+ }
+
+ return -1;
+ }
+
+ /**
+ * Gets the value by a key and a revision.
+ *
+ * @param key Target key.
+ * @param revision Target revision.
+ * @return Entry.
+ */
+ @NotNull
+ Entry doGetValue(byte[] key, long revision) {
+ if (revision == 0)
+ return Entry.empty(key);
+
+ byte[] valueBytes;
+
+ try {
+ valueBytes = data.get(keyToRocksKey(revision, key));
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+
+ if (valueBytes == null || valueBytes.length == 0)
+ return Entry.empty(key);
+
+ Value lastVal = bytesToValue(valueBytes);
+
+ if (lastVal.tombstone())
+ return Entry.tombstone(key, revision, lastVal.updateCounter());
+
+ return new Entry(key, lastVal.bytes(), revision, lastVal.updateCounter());
+ }
+
+ /**
+ * Adds an entry to the batch.
+ *
+ * @param batch Write batch.
+ * @param key Key.
+ * @param value Value.
+ * @param curRev Revision.
+ * @param cntr Update counter.
+ * @throws RocksDBException If failed.
+ */
+ private void addDataToBatch(WriteBatch batch, byte[] key, byte[] value, long curRev, long cntr) throws RocksDBException {
+ byte[] rocksKey = keyToRocksKey(curRev, key);
+
+ byte[] rocksValue = valueToBytes(value, cntr);
+
+ data.put(batch, rocksKey, rocksValue);
+ }
+
+ /**
+ * Adds all entries to the batch.
+ *
+ * @param batch Write batch.
+ * @param keys Keys.
+ * @param values Values.
+ * @param curRev Revision.
+ * @return New update counter value.
+ * @throws RocksDBException If failed.
+ */
+ private long addAllToBatch(WriteBatch batch, List<byte[]> keys, List<byte[]> values, long curRev) throws RocksDBException {
+ long counter = this.updCntr;
+
+ for (int i = 0; i < keys.size(); i++) {
+ counter++;
+
+ byte[] key = keys.get(i);
+
+ byte[] bytes = values.get(i);
+
+ addDataToBatch(batch, key, bytes, curRev, counter);
+ }
+
+ return counter;
+ }
+
+ /**
+ * Gets an entry from the keys index with the least key greater than or equal to the specified key.
+ *
+ * @param keyFrom Key.
+ * @return Higher or equal entry. Returns {@code null} if no such entry exists.
+ */
+ @Nullable
+ Map.Entry<byte[], long[]> revisionCeilingEntry(byte[] keyFrom) {
+ return higherOrCeiling(keyFrom, false);
+ }
+
+ /**
+ * Gets an entry from the keys index with the least key greater than the specified key.
+ *
+ * @param key Key.
+ * @return Higher entry or {@code null} if no such entry exists.
+ */
+ @Nullable
+ Map.Entry<byte[], long[]> revisionHigherEntry(byte[] key) {
+ return higherOrCeiling(key, true);
+ }
+
+ /**
+ * Gets an entry from the keys index with the least key greater than or equal to the specified key,
+ * depending on the strictlyHigher parameter.
+ *
+ * @param key
+ * @param strictlyHigher {@code true} for a strictly higher entry, {@code false} for a ceiling one.
+ * @return Entry for the least key greater than or equal to the specified key. If no such entry
+ * exists returns {@code null}.
+ */
+ @Nullable
+ private IgniteBiTuple<byte[], long[]> higherOrCeiling(byte[] key, boolean strictlyHigher) {
+ try (RocksIterator iterator = index.newIterator()) {
+ iterator.seek(key);
+
+ RocksStorageUtils.RocksBiPredicate predicate = strictlyHigher ?
+ (k, v) -> CMP.compare(k, key) > 0 :
+ (k, v) -> CMP.compare(k, key) >= 0;
+
+ boolean found = find(iterator, predicate);
+
+ if (!found)
+ return null;
+
+ return new IgniteBiTuple<>(iterator.key(), getAsLongs(iterator.value()));
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /**
+ * Creates a new iterator over the {@link StorageColumnFamilyType#DATA} column family.
+ *
+ * @param options Read options.
+ * @return Iterator.
+ */
+ public RocksIterator newDataIterator(ReadOptions options) {
+ return data.newIterator(options);
+ }
+
+ /**
+ * Gets last revision from the list.
+ *
+ * @param revs Revisions.
+ * @return Last revision.
+ */
+ private static long lastRevision(long[] revs) {
+ return revs[revs.length - 1];
+ }
+
+ /**
+ * @return Database lock
+ */
+ ReadWriteLock lock() {
+ return rwLock;
+ }
+
+ /**
+ * @return Database.
+ */
+ RocksDB db() {
+ return db;
+ }
+
+ /** */
+ @TestOnly
+ public Path getDbPath() {
+ return dbPath;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
new file mode 100644
index 0000000..54d13d7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+
+/**
+ * Utility class for {@link RocksDBKeyValueStorage}.
+ */
+class RocksStorageUtils {
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]} array viewed as if it
+ * were a {@code long[]} array. Byte order must be little endian for a correct
+ * lexicographic order comparison.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.LITTLE_ENDIAN
+ );
+
+ /**
+ * Converts a long value to a byte array.
+ *
+ * @param value Value.
+ * @return Byte array.
+ */
+ static byte[] longToBytes(long value) {
+ var buffer = new byte[Long.BYTES];
+
+ LONG_ARRAY_HANDLE.set(buffer, 0, value);
+
+ return buffer;
+ }
+
+ /**
+ * Converts a byte array to a long value.
+ *
+ * @param array Byte array.
+ * @return Long value.
+ */
+ static long bytesToLong(byte[] array) {
+ assert array.length == Long.BYTES;
+
+ return (long) LONG_ARRAY_HANDLE.get(array, 0);
+ }
+
+ /**
+ * Adds a revision to a key.
+ *
+ * @param revision Revision.
+ * @param key Key.
+ * @return Key with a revision.
+ */
+ static byte[] keyToRocksKey(long revision, byte[] key) {
+ var buffer = new byte[Long.BYTES + key.length];
+
+ LONG_ARRAY_HANDLE.set(buffer, 0, revision);
+
+ System.arraycopy(key, 0, buffer, Long.BYTES, key.length);
+
+ return buffer;
+ }
+
+ /**
+ * Gets a key from a key with revision.
+ *
+ * @param rocksKey Key with a revision.
+ * @return Key without a revision.
+ */
+ static byte[] rocksKeyToBytes(byte[] rocksKey) {
+ // Copy bytes of the rocks key ignoring the revision (first 8 bytes)
+ return Arrays.copyOfRange(rocksKey, Long.BYTES, rocksKey.length);
+ }
+
+ /**
+ * Builds a value from a byte array.
+ *
+ * @param valueBytes Value byte array.
+ * @return Value.
+ */
+ static Value bytesToValue(byte[] valueBytes) {
+ // At least an 8-byte update counter and a 1-byte boolean
+ assert valueBytes.length > Long.BYTES;
+
+ // Read an update counter (8-byte long) from the entry.
+ long updateCounter = (long) LONG_ARRAY_HANDLE.get(valueBytes, 0);
+
+ // Read a has-value flag (1 byte) from the entry.
+ boolean hasValue = valueBytes[Long.BYTES] != 0;
+
+ byte[] val;
+ if (hasValue)
+ // Copy the value.
+ val = Arrays.copyOfRange(valueBytes, Long.BYTES + 1, valueBytes.length);
+ else
+ // There is no value, mark it as a tombstone.
+ val = TOMBSTONE;
+
+ return new Value(val, updateCounter);
+ }
+
+ /**
+ * Adds an update counter and a tombstone flag to a value.
+ * @param value Value byte array.
+ * @param updateCounter Update counter.
+ * @return Value with an update counter and a tombstone.
+ */
+ static byte[] valueToBytes(byte[] value, long updateCounter) {
+ var bytes = new byte[Long.BYTES + Byte.BYTES + value.length];
+
+ LONG_ARRAY_HANDLE.set(bytes, 0, updateCounter);
+
+ bytes[Long.BYTES] = (byte) (value == TOMBSTONE ? 0 : 1);
+
+ System.arraycopy(value, 0, bytes, Long.BYTES + Byte.BYTES, value.length);
+
+ return bytes;
+ }
+
+ /**
+ * Gets an array of longs from the byte array of longs.
+ *
+ * @param bytes Byte array of longs.
+ * @return Array of longs.
+ */
+ @NotNull
+ static long[] getAsLongs(byte[] bytes) {
+ // Value must be divisible by a size of a long, because it's a list of longs
+ assert (bytes.length % Long.BYTES) == 0;
+
+ return IntStream.range(0, bytes.length / Long.BYTES)
+ .mapToLong(i -> (long) LONG_ARRAY_HANDLE.get(bytes, i * Long.BYTES))
+ .toArray();
+ }
+
+ /**
+ * Add a long value to an array of longs that is represented by an array of bytes.
+ *
+ * @param bytes Byte array that represents an array of longs.
+ * @param value New long value.
+ * @return Byte array with a new value.
+ */
+ static byte @NotNull [] appendLong(byte @Nullable [] bytes, long value) {
+ if (bytes == null)
+ return longToBytes(value);
+
+ // Allocate a one long size bigger array
+ var result = new byte[bytes.length + Long.BYTES];
+
+ // Copy the current value
+ System.arraycopy(bytes, 0, result, 0, bytes.length);
+
+ LONG_ARRAY_HANDLE.set(result, bytes.length, value);
+
+ return result;
+ }
+
+ /**
+ * Iterates over the given iterator passing key-value pairs to the given consumer and
+ * checks the iterator's status afterwards.
+ *
+ * @param iterator Iterator.
+ * @param consumer Consumer of key-value pairs.
+ * @throws RocksDBException If failed.
+ */
+ static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
+ for (; iterator.isValid(); iterator.next())
+ consumer.accept(iterator.key(), iterator.value());
+
+ checkIterator(iterator);
+ }
+
+ /**
+ * Iterates over the given iterator testing key-value pairs with the given predicate and checks
+ * the iterator's status afterwards.
+ *
+ * @param iterator Iterator.
+ * @param consumer Consumer of key-value pairs.
+ * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
+ * @throws RocksDBException If failed.
+ */
+ static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
+ for (; iterator.isValid(); iterator.next()) {
+ boolean result = consumer.test(iterator.key(), iterator.value());
+
+ if (result)
+ return true;
+ }
+
+ checkIterator(iterator);
+
+ return false;
+ }
+
+ /**
+ * Checks the status of the iterator and throws an exception if it is not correct.
+ *
+ * @param it RocksDB iterator.
+ * @throws IgniteInternalException if the iterator has an incorrect status.
+ */
+ static void checkIterator(RocksIterator it) {
+ try {
+ it.status();
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /**
+ * BiConsumer that can throw {@link RocksDBException}.
+ */
+ @FunctionalInterface
+ interface RocksBiConsumer {
+ /**
+ * Accepts the key and the value of the entry.
+ *
+ * @param key Key.
+ * @param value Value.
+ * @throws RocksDBException If failed to process the key-value pair.
+ */
+ void accept(byte[] key, byte[] value) throws RocksDBException;
+ }
+
+ /**
+ * BiPredicate that can throw {@link RocksDBException}.
+ */
+ @FunctionalInterface
+ interface RocksBiPredicate {
+ /**
+ * Evaluates the predicate on the given key and the given value.
+ *
+ * @param key Key.
+ * @param value Value.
+ * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
+ * @throws RocksDBException If failed to test the key-value pair.
+ */
+ boolean test(byte[] key, byte[] value) throws RocksDBException;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
new file mode 100644
index 0000000..852bf02
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.nio.charset.StandardCharsets;
+import org.rocksdb.RocksDB;
+
+/**
+ * A type of the column family.
+ */
+enum StorageColumnFamilyType {
+ /** Column family for the data. */
+ DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
+
+ /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
+ INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+
+ /** Byte representation of the column family's name. */
+ private final byte[] nameAsBytes;
+
+ /**
+ * Constructor.
+ *
+ * @param bytes Column family name's bytes.
+ */
+ StorageColumnFamilyType(byte[] bytes) {
+ nameAsBytes = bytes;
+ }
+
+ /**
+ * Gets column family name's bytes.
+ *
+ * @return Column family name's bytes.
+ */
+ public byte[] nameAsBytes() {
+ return nameAsBytes;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
new file mode 100644
index 0000000..90ff5f6
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.checkIterator;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+
+/**
+ * Subscription on updates of entries corresponding to the given keys range (where the upper bound is unlimited)
+ * and starting from the given revision number.
+ */
+class WatchCursor implements Cursor<WatchEvent> {
+ /** Storage. */
+ private final RocksDBKeyValueStorage storage;
+
+ /** Key predicate. */
+ private final Predicate<byte[]> p;
+
+ /** Iterator for this cursor. */
+ private final Iterator<WatchEvent> it;
+
+ /** Options for {@link #nativeIterator}. */
+ private final ReadOptions options = new ReadOptions().setPrefixSameAsStart(true);
+
+ /** RocksDB iterator. */
+ private final RocksIterator nativeIterator;
+
+ /**
+ * Last matching revision.
+ */
+ private long lastRetRev;
+
+ /**
+ * Next matching revision. {@code -1} means that it has not been found yet or does not exist.
+ */
+ private long nextRetRev = -1;
+
+ /**
+ * Constructor.
+ *
+ * @param storage Storage.
+ * @param rev Starting revision.
+ * @param p Key predicate.
+ */
+ WatchCursor(RocksDBKeyValueStorage storage, long rev, Predicate<byte[]> p) {
+ this.storage = storage;
+ this.p = p;
+ this.lastRetRev = rev - 1;
+ this.nativeIterator = storage.newDataIterator(options);
+ this.it = createIterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public WatchEvent next() {
+ return it.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ IgniteUtils.closeAll(options, nativeIterator);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Iterator<WatchEvent> iterator() {
+ return it;
+ }
+
+ /**
+ * Creates an iterator for this cursor.
+ *
+ * @return Iterator.
+ */
+ @NotNull
+ private Iterator<WatchEvent> createIterator() {
+ return new Iterator<>() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ storage.lock().readLock().lock();
+
+ try {
+ if (nextRetRev != -1)
+ // Next revision is already calculated and is not -1, meaning that there is a set of keys
+ // matching the revision and the predicate.
+ return true;
+
+ while (true) {
+ long curRev = lastRetRev + 1;
+
+ byte[] revisionPrefix = longToBytes(curRev);
+
+ boolean empty = true;
+
+ if (!nativeIterator.isValid()) {
+ try {
+ nativeIterator.refresh();
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ // Check all keys by the revision to see if any one of them match the predicate.
+ for (nativeIterator.seek(revisionPrefix); nativeIterator.isValid(); nativeIterator.next()) {
+ empty = false;
+
+ byte[] key = rocksKeyToBytes(nativeIterator.key());
+
+ if (p.test(key)) {
+ // Current revision matches.
+ nextRetRev = curRev;
+
+ return true;
+ }
+ }
+
+ checkIterator(nativeIterator);
+
+ if (empty)
+ return false;
+
+ // Go to the next revision.
+ lastRetRev++;
+ }
+ }
+ finally {
+ storage.lock().readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public WatchEvent next() {
+ storage.lock().readLock().lock();
+
+ try {
+ while (true) {
+ if (!hasNext())
+ return null;
+
+ var ref = new Object() {
+ boolean noItemsInRevision = true;
+ };
+
+ List<EntryEvent> evts = new ArrayList<>();
+
+ // Iterate over the keys of the current revision and get all matching entries.
+ RocksStorageUtils.forEach(nativeIterator, (k, v) -> {
+ ref.noItemsInRevision = false;
+
+ byte[] key = rocksKeyToBytes(k);
+
+ Value val = bytesToValue(v);
+
+ if (p.test(key)) {
+ Entry newEntry;
+
+ if (val.tombstone())
+ newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
+ else
+ newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
+
+ Entry oldEntry = storage.doGet(key, nextRetRev - 1, false);
+
+ evts.add(new EntryEvent(oldEntry, newEntry));
+ }
+ });
+
+ if (ref.noItemsInRevision)
+ return null;
+
+ if (evts.isEmpty())
+ continue;
+
+ // Set the last returned revision to the current revision's value.
+ lastRetRev = nextRetRev;
+
+ // Set current revision to -1, meaning that it is not found yet.
+ nextRetRev = -1;
+
+ return new WatchEvent(evts);
+ }
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ finally {
+ storage.lock().readLock().unlock();
+ }
+ }
+ };
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 38f76da..55a7b44 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.server.raft;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -65,6 +66,7 @@ import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.TestOnly;
/**
* Meta storage listener.
@@ -314,14 +316,34 @@ public class MetaStorageListener implements RaftGroupListener {
}
/** {@inheritDoc} */
- @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
- // Not implemented yet.
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+ storage.snapshot(path).whenComplete((unused, throwable) -> {
+ doneClo.accept(throwable);
+ });
}
/** {@inheritDoc} */
- @Override public boolean onSnapshotLoad(String path) {
- // Not implemented yet.
- return false;
+ @Override public boolean onSnapshotLoad(Path path) {
+ storage.restoreSnapshot(path);
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onShutdown() {
+ try {
+ storage.close();
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * @return {@link KeyValueStorage} that is backing this listener.
+ */
+ @TestOnly
+ public KeyValueStorage getStorage() {
+ return storage;
}
/** */
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
similarity index 92%
copy from modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
copy to modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 508424f..24c7253 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -26,8 +26,10 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,17 +40,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
- * Tests for in-memory meta storage implementation.
+ * Tests for key-value storage implementations.
*/
-class SimpleInMemoryKeyValueStorageTest {
+public abstract class AbstractKeyValueStorageTest {
/** */
private KeyValueStorage storage;
@BeforeEach
public void setUp() {
- storage = new SimpleInMemoryKeyValueStorage();
+ storage = storage();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ storage.close();
}
+ /**
+ * @return Key value storage for this test.
+ */
+ abstract KeyValueStorage storage();
+
@Test
public void put() {
byte[] key = k(1);
@@ -967,12 +979,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ ),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1023,12 +1035,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
+ new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ )
);
// "Failure" branch is applied.
@@ -1079,12 +1091,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ ),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1135,12 +1147,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
+ new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ )
);
// "Failure" branch is applied.
@@ -1191,12 +1203,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ ),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1247,12 +1259,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
+ new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ )
);
// "Failure" branch is applied.
@@ -1303,9 +1315,9 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(2, storage.updateCounter());
boolean branch = storage.invoke(
- new TombstoneCondition(key1),
- List.of(new Operation(OperationType.PUT, key2, val2)),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new TombstoneCondition(key1),
+ List.of(new Operation(OperationType.PUT, key2, val2)),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1355,9 +1367,9 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new TombstoneCondition(key1),
- List.of(new Operation(OperationType.PUT, key2, val2)),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new TombstoneCondition(key1),
+ List.of(new Operation(OperationType.PUT, key2, val2)),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Failure" branch is applied.
@@ -1408,12 +1420,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ ),
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1464,12 +1476,12 @@ class SimpleInMemoryKeyValueStorageTest {
assertEquals(1, storage.updateCounter());
boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
+ List.of(
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
+ )
);
// "Failure" branch is applied.
@@ -1520,9 +1532,9 @@ class SimpleInMemoryKeyValueStorageTest {
// No-op.
boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(new Operation(OperationType.NO_OP, null, null)),
- List.of(new Operation(OperationType.NO_OP, null, null))
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(new Operation(OperationType.NO_OP, null, null)),
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
@@ -1533,12 +1545,12 @@ class SimpleInMemoryKeyValueStorageTest {
// Put.
branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(
- new Operation(OperationType.PUT, key2, val2),
- new Operation(OperationType.PUT, key3, val3)
- ),
- List.of(new Operation(OperationType.NO_OP, null, null))
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(
+ new Operation(OperationType.PUT, key2, val2),
+ new Operation(OperationType.PUT, key3, val3)
+ ),
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
@@ -1567,12 +1579,12 @@ class SimpleInMemoryKeyValueStorageTest {
// Remove.
branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(
- new Operation(OperationType.REMOVE, key2, null),
- new Operation(OperationType.REMOVE, key3, null)
- ),
- List.of(new Operation(OperationType.NO_OP, null, null))
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(
+ new Operation(OperationType.REMOVE, key2, null),
+ new Operation(OperationType.REMOVE, key3, null)
+ ),
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
@@ -1773,7 +1785,7 @@ class SimpleInMemoryKeyValueStorageTest {
}
@Test
- public void watchCursorForRange() {
+ public void watchCursorForRange() throws Exception {
byte[] key1 = k(1);
byte[] val1_1 = kv(1, 11);
@@ -1817,7 +1829,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertFalse(watchEvent.single());
Map<ByteArray, EntryEvent> map = watchEvent.entryEvents().stream()
- .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
+ .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
assertEquals(2, map.size());
@@ -1895,6 +1907,8 @@ class SimpleInMemoryKeyValueStorageTest {
assertNull(newEntry1.value());
assertFalse(it.hasNext());
+
+ cur.close();
}
@Test
@@ -1969,7 +1983,7 @@ class SimpleInMemoryKeyValueStorageTest {
assertArrayEquals(key1, newEntry1.key());
assertArrayEquals(val1_1, newEntry1.value());
- newEntry1 = e1.entry();
+ newEntry1 = e1.entry();
assertFalse(newEntry1.empty());
assertFalse(newEntry1.tombstone());
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
new file mode 100644
index 0000000..af39c1a
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for RocksDB key-value storage implementation.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbKeyValueStorageTest extends AbstractKeyValueStorageTest {
+ /** */
+ @WorkDirectory
+ private Path workDir;
+
+ /** {@inheritDoc} */
+ @Override KeyValueStorage storage() {
+ return new RocksDBKeyValueStorage(workDir);
+ }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 508424f..fd68a7e 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -17,2034 +17,12 @@
package org.apache.ignite.internal.metastorage.server;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.metastorage.common.OperationType;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.ByteArray;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import static java.util.function.Function.identity;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
/**
- * Tests for in-memory meta storage implementation.
+ * Tests for in-memory key-value storage implementation.
*/
-class SimpleInMemoryKeyValueStorageTest {
- /** */
- private KeyValueStorage storage;
-
- @BeforeEach
- public void setUp() {
- storage = new SimpleInMemoryKeyValueStorage();
- }
-
- @Test
- public void put() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- storage.put(key, val);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- Entry e = storage.get(key);
-
- assertFalse(e.empty());
- assertFalse(e.tombstone());
- assertEquals(1, e.revision());
- assertEquals(1, e.updateCounter());
-
- storage.put(key, val);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- e = storage.get(key);
-
- assertFalse(e.empty());
- assertFalse(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
- }
-
- @Test
- void getAll() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Regular put.
- storage.put(key1, val1);
-
- // Rewrite.
- storage.put(key2, val2_1);
- storage.put(key2, val2_2);
-
- // Remove.
- storage.put(key3, val3);
- storage.remove(key3);
-
- assertEquals(5, storage.revision());
- assertEquals(5, storage.updateCounter());
-
- Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
- assertEquals(4, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
- assertArrayEquals(val1, e1.value());
-
- // Test rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(3, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_2, e2.value());
-
- // Test removed value.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(5, e3.revision());
- assertEquals(5, e3.updateCounter());
- assertTrue(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
- }
-
- @Test
- void getAllWithRevisionBound() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Regular put.
- storage.put(key1, val1);
-
- // Rewrite.
- storage.put(key2, val2_1);
- storage.put(key2, val2_2);
-
- // Remove.
- storage.put(key3, val3);
- storage.remove(key3);
-
- assertEquals(5, storage.revision());
- assertEquals(5, storage.updateCounter());
-
- // Bounded by revision 2.
- Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4), 2);
-
- assertEquals(4, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
- assertArrayEquals(val1, e1.value());
-
- // Test while not rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(2, e2.revision());
- assertEquals(2, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_1, e2.value());
-
- // Values with larger revision don't exist yet.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertTrue(e3.empty());
-
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertTrue(e4.empty());
-
- // Bounded by revision 4.
- entries = storage.getAll(List.of(key1, key2, key3, key4), 4);
-
- assertEquals(4, entries.size());
-
- map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
- assertArrayEquals(val1, e1.value());
-
- // Test rewritten value.
- e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(3, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_2, e2.value());
-
- // Test not removed value.
- e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(4, e3.revision());
- assertEquals(4, e3.updateCounter());
- assertFalse(e3.tombstone());
- assertFalse(e3.empty());
- assertArrayEquals(val3, e3.value());
-
- // Value with larger revision doesn't exist yet.
- e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertTrue(e4.empty());
- }
-
- @Test
- public void getAndPut() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- Entry e = storage.getAndPut(key, val);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
- assertTrue(e.empty());
- assertFalse(e.tombstone());
- assertEquals(0, e.revision());
- assertEquals(0, e.updateCounter());
-
- e = storage.getAndPut(key, val);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
- assertFalse(e.empty());
- assertFalse(e.tombstone());
- assertEquals(1, e.revision());
- assertEquals(1, e.updateCounter());
- }
-
- @Test
- public void putAll() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
- byte[] val3_2 = kv(3, 32);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Must be rewritten.
- storage.put(key2, val2_1);
-
- // Remove. Tombstone must be replaced by new value.
- storage.put(key3, val3_1);
- storage.remove(key3);
-
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
-
- assertEquals(4, storage.revision());
- assertEquals(6, storage.updateCounter());
-
- Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
- assertEquals(4, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(4, e1.revision());
- assertEquals(4, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
- assertArrayEquals(val1, e1.value());
-
- // Test rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(4, e2.revision());
- assertEquals(5, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_2, e2.value());
-
- // Test removed value.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(4, e3.revision());
- assertEquals(6, e3.updateCounter());
- assertFalse(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
- }
-
- @Test
- public void getAndPutAll() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
- byte[] val3_2 = kv(3, 32);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Must be rewritten.
- storage.put(key2, val2_1);
-
- // Remove. Tombstone must be replaced by new value.
- storage.put(key3, val3_1);
- storage.remove(key3);
-
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
-
- assertEquals(4, storage.revision());
- assertEquals(6, storage.updateCounter());
-
- assertEquals(3, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(0, e1.revision());
- assertEquals(0, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertTrue(e1.empty());
-
- // Test rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(1, e2.revision());
- assertEquals(1, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_1, e2.value());
-
- // Test removed value.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(3, e3.revision());
- assertEquals(3, e3.updateCounter());
- assertTrue(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test state after putAll.
- entries = storage.getAll(List.of(key1, key2, key3, key4));
-
- assertEquals(4, entries.size());
-
- map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(4, e1.revision());
- assertEquals(4, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
- assertArrayEquals(val1, e1.value());
-
- // Test rewritten value.
- e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(4, e2.revision());
- assertEquals(5, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
- assertArrayEquals(val2_2, e2.value());
-
- // Test removed value.
- e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(4, e3.revision());
- assertEquals(6, e3.updateCounter());
- assertFalse(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
- }
-
- @Test
- public void remove() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- // Remove non-existent entry.
- storage.remove(key);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- storage.put(key, val);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- // Remove existent entry.
- storage.remove(key);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- Entry e = storage.get(key);
-
- assertFalse(e.empty());
- assertTrue(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
-
- // Remove already removed entry (tombstone can't be removed).
- storage.remove(key);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- e = storage.get(key);
-
- assertFalse(e.empty());
- assertTrue(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
- }
-
- @Test
- public void getAndRemove() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- // Remove non-existent entry.
- Entry e = storage.getAndRemove(key);
-
- assertTrue(e.empty());
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
- assertTrue(storage.get(key).empty());
-
- storage.put(key, val);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- // Remove existent entry.
- e = storage.getAndRemove(key);
-
- assertFalse(e.empty());
- assertFalse(e.tombstone());
- assertEquals(1, e.revision());
- assertEquals(1, e.updateCounter());
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- e = storage.get(key);
-
- assertFalse(e.empty());
- assertTrue(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
-
- // Remove already removed entry (tombstone can't be removed).
- e = storage.getAndRemove(key);
-
- assertFalse(e.empty());
- assertTrue(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- e = storage.get(key);
-
- assertFalse(e.empty());
- assertTrue(e.tombstone());
- assertEquals(2, e.revision());
- assertEquals(2, e.updateCounter());
- }
-
- @Test
- public void removeAll() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Regular put.
- storage.put(key1, val1);
-
- // Rewrite.
- storage.put(key2, val2_1);
- storage.put(key2, val2_2);
-
- // Remove. Tombstone must not be removed again.
- storage.put(key3, val3_1);
- storage.remove(key3);
-
- assertEquals(5, storage.revision());
- assertEquals(5, storage.updateCounter());
-
- storage.removeAll(List.of(key1, key2, key3, key4));
-
- assertEquals(6, storage.revision());
- assertEquals(7, storage.updateCounter()); // Only two keys are updated.
-
- Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
- assertEquals(4, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(6, e1.revision());
- assertEquals(6, e1.updateCounter());
- assertTrue(e1.tombstone());
- assertFalse(e1.empty());
-
- // Test rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(6, e2.revision());
- assertEquals(7, e2.updateCounter());
- assertTrue(e2.tombstone());
- assertFalse(e2.empty());
-
- // Test removed value.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(5, e3.revision());
- assertEquals(5, e3.updateCounter());
- assertTrue(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
- }
-
- @Test
- public void getAndRemoveAll() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
-
- byte[] key4 = k(4);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Regular put.
- storage.put(key1, val1);
-
- // Rewrite.
- storage.put(key2, val2_1);
- storage.put(key2, val2_2);
-
- // Remove. Tombstone must not be removed again.
- storage.put(key3, val3_1);
- storage.remove(key3);
-
- assertEquals(5, storage.revision());
- assertEquals(5, storage.updateCounter());
-
- Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4));
-
- assertEquals(6, storage.revision());
- assertEquals(7, storage.updateCounter()); // Only two keys are updated.
-
- assertEquals(4, entries.size());
-
- Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- Entry e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
- assertFalse(e1.tombstone());
- assertFalse(e1.empty());
-
- // Test rewritten value.
- Entry e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(3, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertFalse(e2.tombstone());
- assertFalse(e2.empty());
-
- // Test removed value.
- Entry e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(5, e3.revision());
- assertEquals(5, e3.updateCounter());
- assertTrue(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- Entry e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
-
- // Test state after getAndRemoveAll.
- entries = storage.getAll(List.of(key1, key2, key3, key4));
-
- assertEquals(4, entries.size());
-
- map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
- // Test regular put value.
- e1 = map.get(new ByteArray(key1));
-
- assertNotNull(e1);
- assertEquals(6, e1.revision());
- assertEquals(6, e1.updateCounter());
- assertTrue(e1.tombstone());
- assertFalse(e1.empty());
-
- // Test rewritten value.
- e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
- assertEquals(6, e2.revision());
- assertEquals(7, e2.updateCounter());
- assertTrue(e2.tombstone());
- assertFalse(e2.empty());
-
- // Test removed value.
- e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
- assertEquals(5, e3.revision());
- assertEquals(5, e3.updateCounter());
- assertTrue(e3.tombstone());
- assertFalse(e3.empty());
-
- // Test empty value.
- e4 = map.get(new ByteArray(key4));
-
- assertNotNull(e4);
- assertFalse(e4.tombstone());
- assertTrue(e4.empty());
- }
-
- @Test
- public void getAfterRemove() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- storage.getAndPut(key, val);
-
- storage.getAndRemove(key);
-
- Entry e = storage.get(key);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
- assertEquals(2, e.revision());
- assertTrue(e.tombstone());
- }
-
- @Test
- public void getAndPutAfterRemove() {
- byte[] key = k(1);
- byte[] val = kv(1, 1);
-
- storage.getAndPut(key, val);
-
- storage.getAndRemove(key);
-
- Entry e = storage.getAndPut(key, val);
-
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
- assertEquals(2, e.revision());
- assertTrue(e.tombstone());
- }
-
- @Test
- public void putGetRemoveCompact() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 1);
- byte[] val1_3 = kv(1, 3);
-
- byte[] key2 = k(2);
- byte[] val2_2 = kv(2, 2);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Previous entry is empty.
- Entry emptyEntry = storage.getAndPut(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
- assertTrue(emptyEntry.empty());
-
- // Entry with rev == 1.
- Entry e1_1 = storage.get(key1);
-
- assertFalse(e1_1.empty());
- assertFalse(e1_1.tombstone());
- assertArrayEquals(key1, e1_1.key());
- assertArrayEquals(val1_1, e1_1.value());
- assertEquals(1, e1_1.revision());
- assertEquals(1, e1_1.updateCounter());
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- // Previous entry is empty.
- emptyEntry = storage.getAndPut(key2, val2_2);
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
- assertTrue(emptyEntry.empty());
-
- // Entry with rev == 2.
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertArrayEquals(key2, e2.key());
- assertArrayEquals(val2_2, e2.value());
- assertEquals(2, e2.revision());
- assertEquals(2, e2.updateCounter());
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- // Previous entry is not empty.
- e1_1 = storage.getAndPut(key1, val1_3);
-
- assertFalse(e1_1.empty());
- assertFalse(e1_1.tombstone());
- assertArrayEquals(key1, e1_1.key());
- assertArrayEquals(val1_1, e1_1.value());
- assertEquals(1, e1_1.revision());
- assertEquals(1, e1_1.updateCounter());
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- // Entry with rev == 3.
- Entry e1_3 = storage.get(key1);
-
- assertFalse(e1_3.empty());
- assertFalse(e1_3.tombstone());
- assertArrayEquals(key1, e1_3.key());
- assertArrayEquals(val1_3, e1_3.value());
- assertEquals(3, e1_3.revision());
- assertEquals(3, e1_3.updateCounter());
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- // Remove existing entry.
- Entry e2_2 = storage.getAndRemove(key2);
-
- assertFalse(e2_2.empty());
- assertFalse(e2_2.tombstone());
- assertArrayEquals(key2, e2_2.key());
- assertArrayEquals(val2_2, e2_2.value());
- assertEquals(2, e2_2.revision());
- assertEquals(2, e2_2.updateCounter());
- assertEquals(4, storage.revision()); // Storage revision is changed.
- assertEquals(4, storage.updateCounter());
-
- // Remove already removed entry.
- Entry tombstoneEntry = storage.getAndRemove(key2);
-
- assertFalse(tombstoneEntry.empty());
- assertTrue(tombstoneEntry.tombstone());
- assertEquals(4, storage.revision()); // Storage revision is not changed.
- assertEquals(4, storage.updateCounter());
-
- // Compact and check that tombstones are removed.
- storage.compact();
-
- assertEquals(4, storage.revision());
- assertEquals(4, storage.updateCounter());
- assertTrue(storage.getAndRemove(key2).empty());
- assertTrue(storage.get(key2).empty());
-
- // Remove existing entry.
- e1_3 = storage.getAndRemove(key1);
-
- assertFalse(e1_3.empty());
- assertFalse(e1_3.tombstone());
- assertArrayEquals(key1, e1_3.key());
- assertArrayEquals(val1_3, e1_3.value());
- assertEquals(3, e1_3.revision());
- assertEquals(3, e1_3.updateCounter());
- assertEquals(5, storage.revision()); // Storage revision is changed.
- assertEquals(5, storage.updateCounter());
-
- // Remove already removed entry.
- tombstoneEntry = storage.getAndRemove(key1);
-
- assertFalse(tombstoneEntry.empty());
- assertTrue(tombstoneEntry.tombstone());
- assertEquals(5, storage.revision()); // // Storage revision is not changed.
- assertEquals(5, storage.updateCounter());
-
- // Compact and check that tombstones are removed.
- storage.compact();
-
- assertEquals(5, storage.revision());
- assertEquals(5, storage.updateCounter());
- assertTrue(storage.getAndRemove(key1).empty());
- assertTrue(storage.get(key1).empty());
- }
-
- @Test
- public void invokeWithRevisionCondition_successBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Success" branch is applied.
- assertTrue(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Failure" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithRevisionCondition_failureBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
- );
-
- // "Failure" branch is applied.
- assertFalse(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Success" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithExistsCondition_successBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Success" branch is applied.
- assertTrue(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Failure" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithExistsCondition_failureBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
- );
-
- // "Failure" branch is applied.
- assertFalse(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Success" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithNotExistsCondition_successBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Success" branch is applied.
- assertTrue(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Failure" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithNotExistsCondition_failureBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
- );
-
- // "Failure" branch is applied.
- assertFalse(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Success" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithTombstoneCondition_successBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
- storage.remove(key1); // Should be tombstone after remove.
-
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new TombstoneCondition(key1),
- List.of(new Operation(OperationType.PUT, key2, val2)),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Success" branch is applied.
- assertTrue(branch);
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertTrue(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertNull(e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(3, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Failure" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithTombstoneCondition_failureBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new TombstoneCondition(key1),
- List.of(new Operation(OperationType.PUT, key2, val2)),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Failure" branch is applied.
- assertFalse(branch);
- assertEquals(2, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
- assertArrayEquals(val1_1, e1.value());
-
- Entry e3 = storage.get(key3);
-
- assertFalse(e3.empty());
- assertFalse(e3.tombstone());
- assertEquals(2, e3.revision());
- assertEquals(2, e3.updateCounter());
- assertArrayEquals(val3, e3.value());
-
- // "Success" branch isn't applied.
- Entry e2 = storage.get(key2);
-
- assertTrue(e2.empty());
- }
-
- @Test
- public void invokeWithValueCondition_successBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- ),
- List.of(new Operation(OperationType.PUT, key3, val3))
- );
-
- // "Success" branch is applied.
- assertTrue(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Failure" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeWithValueCondition_failureBranch() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1_1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
- List.of(new Operation(OperationType.PUT, key3, val3)),
- List.of(
- new Operation(OperationType.PUT, key1, val1_2),
- new Operation(OperationType.PUT, key2, val2)
- )
- );
-
- // "Failure" branch is applied.
- assertFalse(branch);
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e1 = storage.get(key1);
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertEquals(2, e1.revision());
- assertEquals(2, e1.updateCounter());
- assertArrayEquals(val1_2, e1.value());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(3, e2.updateCounter());
- assertArrayEquals(val2, e2.value());
-
- // "Success" branch isn't applied.
- Entry e3 = storage.get(key3);
-
- assertTrue(e3.empty());
- }
-
- @Test
- public void invokeOperations() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.put(key1, val1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- // No-op.
- boolean branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(new Operation(OperationType.NO_OP, null, null)),
- List.of(new Operation(OperationType.NO_OP, null, null))
- );
-
- assertTrue(branch);
-
- // No updates.
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- // Put.
- branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(
- new Operation(OperationType.PUT, key2, val2),
- new Operation(OperationType.PUT, key3, val3)
- ),
- List.of(new Operation(OperationType.NO_OP, null, null))
- );
-
- assertTrue(branch);
-
- // +1 for revision, +2 for update counter.
- assertEquals(2, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- Entry e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertEquals(2, e2.revision());
- assertEquals(2, e2.updateCounter());
- assertArrayEquals(key2, e2.key());
- assertArrayEquals(val2, e2.value());
-
- Entry e3 = storage.get(key3);
-
- assertFalse(e3.empty());
- assertFalse(e3.tombstone());
- assertEquals(2, e3.revision());
- assertEquals(3, e3.updateCounter());
- assertArrayEquals(key3, e3.key());
- assertArrayEquals(val3, e3.value());
-
- // Remove.
- branch = storage.invoke(
- new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(
- new Operation(OperationType.REMOVE, key2, null),
- new Operation(OperationType.REMOVE, key3, null)
- ),
- List.of(new Operation(OperationType.NO_OP, null, null))
- );
-
- assertTrue(branch);
-
- // +1 for revision, +2 for update counter.
- assertEquals(3, storage.revision());
- assertEquals(5, storage.updateCounter());
-
- e2 = storage.get(key2);
-
- assertFalse(e2.empty());
- assertTrue(e2.tombstone());
- assertEquals(3, e2.revision());
- assertEquals(4, e2.updateCounter());
- assertArrayEquals(key2, e2.key());
-
- e3 = storage.get(key3);
-
- assertFalse(e3.empty());
- assertTrue(e3.tombstone());
- assertEquals(3, e3.revision());
- assertEquals(5, e3.updateCounter());
- assertArrayEquals(key3, e3.key());
- }
-
- @Test
- public void compact() {
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Compact empty.
- storage.compact();
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Compact non-empty.
- fill(storage, 1, 1);
-
- assertEquals(1, storage.revision());
- assertEquals(1, storage.updateCounter());
-
- fill(storage, 2, 2);
-
- assertEquals(3, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- fill(storage, 3, 3);
-
- assertEquals(6, storage.revision());
- assertEquals(6, storage.updateCounter());
-
- storage.getAndRemove(k(3));
-
- assertEquals(7, storage.revision());
- assertEquals(7, storage.updateCounter());
- assertTrue(storage.get(k(3)).tombstone());
-
- storage.compact();
-
- assertEquals(7, storage.revision());
- assertEquals(7, storage.updateCounter());
-
- Entry e1 = storage.get(k(1));
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertArrayEquals(k(1), e1.key());
- assertArrayEquals(kv(1,1), e1.value());
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
-
- Entry e2 = storage.get(k(2));
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertArrayEquals(k(2), e2.key());
- assertArrayEquals(kv(2,2), e2.value());
- assertTrue(storage.get(k(2), 2).empty());
- assertEquals(3, e2.revision());
- assertEquals(3, e2.updateCounter());
-
- Entry e3 = storage.get(k(3));
-
- assertTrue(e3.empty());
- assertTrue(storage.get(k(3), 5).empty());
- assertTrue(storage.get(k(3), 6).empty());
- assertTrue(storage.get(k(3), 7).empty());
- }
-
- @Test
- public void rangeCursor() {
- byte[] key1 = k(1);
- byte[] val1 = kv(1, 1);
-
- byte[] key2 = k(2);
- byte[] val2 = kv(2, 2);
-
- byte[] key3 = k(3);
- byte[] val3 = kv(3, 3);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3));
-
- assertEquals(1, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- // Range for latest revision without max bound.
- Cursor<Entry> cur = storage.range(key1, null);
-
- Iterator<Entry> it = cur.iterator();
-
- assertTrue(it.hasNext());
-
- Entry e1 = it.next();
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertArrayEquals(key1, e1.key());
- assertArrayEquals(val1, e1.value());
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
-
- assertTrue(it.hasNext());
-
- Entry e2 = it.next();
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertArrayEquals(key2, e2.key());
- assertArrayEquals(val2, e2.value());
- assertEquals(1, e2.revision());
- assertEquals(2, e2.updateCounter());
-
- // Deliberately don't call it.hasNext()
-
- Entry e3 = it.next();
-
- assertFalse(e3.empty());
- assertFalse(e3.tombstone());
- assertArrayEquals(key3, e3.key());
- assertArrayEquals(val3, e3.value());
- assertEquals(1, e3.revision());
- assertEquals(3, e3.updateCounter());
-
- assertFalse(it.hasNext());
-
- try {
- it.next();
-
- fail();
- }
- catch (NoSuchElementException e) {
- System.out.println();
- // No-op.
- }
-
- // Range for latest revision with max bound.
- cur = storage.range(key1, key3);
-
- it = cur.iterator();
-
- assertTrue(it.hasNext());
-
- e1 = it.next();
-
- assertFalse(e1.empty());
- assertFalse(e1.tombstone());
- assertArrayEquals(key1, e1.key());
- assertArrayEquals(val1, e1.value());
- assertEquals(1, e1.revision());
- assertEquals(1, e1.updateCounter());
-
- assertTrue(it.hasNext());
-
- e2 = it.next();
-
- assertFalse(e2.empty());
- assertFalse(e2.tombstone());
- assertArrayEquals(key2, e2.key());
- assertArrayEquals(val2, e2.value());
- assertEquals(1, e2.revision());
- assertEquals(2, e2.updateCounter());
-
- assertFalse(it.hasNext());
-
- try {
- it.next();
-
- fail();
- }
- catch (NoSuchElementException e) {
- System.out.println();
- // No-op.
- }
- }
-
- @Test
- public void watchCursorForRange() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- // Watch for all updates starting from revision 2.
- Cursor<WatchEvent> cur = storage.watch(key1, null, 2);
-
- Iterator<WatchEvent> it = cur.iterator();
-
- assertFalse(it.hasNext());
- assertNull(it.next());
-
- storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
-
- assertEquals(1, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- // Revision is less than 2.
- assertFalse(it.hasNext());
- assertNull(it.next());
-
- storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1));
-
- assertEquals(2, storage.revision());
- assertEquals(4, storage.updateCounter());
-
- // Revision is 2.
- assertTrue(it.hasNext());
-
- WatchEvent watchEvent = it.next();
-
- assertFalse(watchEvent.single());
-
- Map<ByteArray, EntryEvent> map = watchEvent.entryEvents().stream()
- .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
-
- assertEquals(2, map.size());
-
- // First update under revision.
- EntryEvent e2 = map.get(new ByteArray(key2));
-
- assertNotNull(e2);
-
- Entry oldEntry2 = e2.oldEntry();
-
- assertFalse(oldEntry2.empty());
- assertFalse(oldEntry2.tombstone());
- assertEquals(1, oldEntry2.revision());
- assertEquals(2, oldEntry2.updateCounter());
- assertArrayEquals(key2, oldEntry2.key());
- assertArrayEquals(val2_1, oldEntry2.value());
-
- Entry newEntry2 = e2.entry();
-
- assertFalse(newEntry2.empty());
- assertFalse(newEntry2.tombstone());
- assertEquals(2, newEntry2.revision());
- assertEquals(3, newEntry2.updateCounter());
- assertArrayEquals(key2, newEntry2.key());
- assertArrayEquals(val2_2, newEntry2.value());
-
- // Second update under revision.
- EntryEvent e3 = map.get(new ByteArray(key3));
-
- assertNotNull(e3);
-
- Entry oldEntry3 = e3.oldEntry();
-
- assertTrue(oldEntry3.empty());
- assertFalse(oldEntry3.tombstone());
- assertArrayEquals(key3, oldEntry3.key());
-
- Entry newEntry3 = e3.entry();
-
- assertFalse(newEntry3.empty());
- assertFalse(newEntry3.tombstone());
- assertEquals(2, newEntry3.revision());
- assertEquals(4, newEntry3.updateCounter());
- assertArrayEquals(key3, newEntry3.key());
- assertArrayEquals(val3_1, newEntry3.value());
-
- assertFalse(it.hasNext());
-
- storage.remove(key1);
-
- assertTrue(it.hasNext());
-
- watchEvent = it.next();
-
- assertTrue(watchEvent.single());
-
- EntryEvent e1 = watchEvent.entryEvent();
-
- Entry oldEntry1 = e1.oldEntry();
-
- assertFalse(oldEntry1.empty());
- assertFalse(oldEntry1.tombstone());
- assertEquals(1, oldEntry1.revision());
- assertEquals(1, oldEntry1.updateCounter());
- assertArrayEquals(key1, oldEntry1.key());
- assertArrayEquals(val1_1, oldEntry1.value());
-
- Entry newEntry1 = e1.entry();
-
- assertFalse(newEntry1.empty());
- assertTrue(newEntry1.tombstone());
- assertEquals(3, newEntry1.revision());
- assertEquals(5, newEntry1.updateCounter());
- assertArrayEquals(key1, newEntry1.key());
- assertNull(newEntry1.value());
-
- assertFalse(it.hasNext());
- }
-
- @Test
- public void watchCursorForKey() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
- byte[] val1_2 = kv(1, 12);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- Cursor<WatchEvent> cur = storage.watch(key1, 1);
-
- Iterator<WatchEvent> it = cur.iterator();
-
- assertFalse(it.hasNext());
- assertNull(it.next());
-
- storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
-
- assertEquals(1, storage.revision());
- assertEquals(2, storage.updateCounter());
-
- assertTrue(it.hasNext());
-
- WatchEvent watchEvent = it.next();
-
- assertTrue(watchEvent.single());
-
- EntryEvent e1 = watchEvent.entryEvent();
-
- Entry oldEntry1 = e1.oldEntry();
-
- assertTrue(oldEntry1.empty());
- assertFalse(oldEntry1.tombstone());
-
- Entry newEntry1 = e1.entry();
-
- assertFalse(newEntry1.empty());
- assertFalse(newEntry1.tombstone());
- assertEquals(1, newEntry1.revision());
- assertEquals(1, newEntry1.updateCounter());
- assertArrayEquals(key1, newEntry1.key());
- assertArrayEquals(val1_1, newEntry1.value());
-
- assertFalse(it.hasNext());
-
- storage.put(key2, val2_2);
-
- assertFalse(it.hasNext());
-
- storage.put(key1, val1_2);
-
- assertTrue(it.hasNext());
-
- watchEvent = it.next();
-
- assertTrue(watchEvent.single());
-
- e1 = watchEvent.entryEvent();
-
- oldEntry1 = e1.oldEntry();
-
- assertFalse(oldEntry1.empty());
- assertFalse(oldEntry1.tombstone());
- assertEquals(1, oldEntry1.revision());
- assertEquals(1, oldEntry1.updateCounter());
- assertArrayEquals(key1, newEntry1.key());
- assertArrayEquals(val1_1, newEntry1.value());
-
- newEntry1 = e1.entry();
-
- assertFalse(newEntry1.empty());
- assertFalse(newEntry1.tombstone());
- assertEquals(3, newEntry1.revision());
- assertEquals(4, newEntry1.updateCounter());
- assertArrayEquals(key1, newEntry1.key());
- assertArrayEquals(val1_2, newEntry1.value());
-
- assertFalse(it.hasNext());
- }
-
- @Test
- public void watchCursorForKeys() {
- byte[] key1 = k(1);
- byte[] val1_1 = kv(1, 11);
-
- byte[] key2 = k(2);
- byte[] val2_1 = kv(2, 21);
- byte[] val2_2 = kv(2, 22);
-
- byte[] key3 = k(3);
- byte[] val3_1 = kv(3, 31);
- byte[] val3_2 = kv(3, 32);
-
- assertEquals(0, storage.revision());
- assertEquals(0, storage.updateCounter());
-
- Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1);
-
- Iterator<WatchEvent> it = cur.iterator();
-
- assertFalse(it.hasNext());
- assertNull(it.next());
-
- storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1));
-
- assertEquals(1, storage.revision());
- assertEquals(3, storage.updateCounter());
-
- assertTrue(it.hasNext());
-
- WatchEvent watchEvent = it.next();
-
- assertFalse(watchEvent.single());
-
- assertFalse(it.hasNext());
-
- storage.put(key2, val2_2);
-
- assertTrue(it.hasNext());
-
- watchEvent = it.next();
-
- assertTrue(watchEvent.single());
-
- assertFalse(it.hasNext());
-
- storage.put(key3, val3_2);
-
- assertFalse(it.hasNext());
- }
-
- /** */
- private static void fill(KeyValueStorage storage, int keySuffix, int num) {
- for (int i = 0; i < num; i++)
- storage.getAndPut(k(keySuffix), kv(keySuffix, i + 1));
- }
-
- /** */
- private static byte[] k(int k) {
- return ("key" + k).getBytes();
- }
-
- /** */
- private static byte[] kv(int k, int v) {
- return ("key" + k + '_' + "val" + v).getBytes();
+class SimpleInMemoryKeyValueStorageTest extends AbstractKeyValueStorageTest {
+ /** {@inheritDoc} */
+ @Override KeyValueStorage storage() {
+ return new SimpleInMemoryKeyValueStorage();
}
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index 9a09282..e186fdf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.service;
+import java.nio.file.Path;
import java.util.Iterator;
import java.util.function.Consumer;
import org.apache.ignite.raft.client.ReadCommand;
@@ -56,7 +57,7 @@ public interface RaftGroupListener {
* @param doneClo The closure to call on finish. Pass the not null exception if the snapshot has not been created or
* null on successful creation.
*/
- public void onSnapshotSave(String path, Consumer<Throwable> doneClo);
+ public void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
/**
* The callback to load a snapshot.
@@ -64,5 +65,10 @@ public interface RaftGroupListener {
* @param path Snapshot directory.
* @return {@code True} if the snapshot was loaded successfully.
*/
- boolean onSnapshotLoad(String path);
+ boolean onSnapshotLoad(Path path);
+
+ /**
+ * Invoked once after a raft node has been shut down.
+ */
+ void onShutdown();
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
index c692a66..c0a856e 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.server;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -73,7 +74,7 @@ public class CounterListener implements RaftGroupListener {
}
/** {@inheritDoc} */
- @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
final long currVal = this.counter.get();
Utils.runInThread(executor, () -> {
@@ -90,8 +91,9 @@ public class CounterListener implements RaftGroupListener {
});
}
- /** {@inheritDoc} */
- @Override public boolean onSnapshotLoad(String path) {
+ /** {@inheritDoc}
+ * @param path*/
+ @Override public boolean onSnapshotLoad(Path path) {
final CounterSnapshotFile snapshot = new CounterSnapshotFile(path + File.separator + "data");
try {
this.counter.set(snapshot.load());
@@ -103,6 +105,11 @@ public class CounterListener implements RaftGroupListener {
}
}
+ /** {@inheritDoc} */
+ @Override public void onShutdown() {
+ // No-op.
+ }
+
/**
* @return Current value.
*/
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index c42fb37..a1c719a 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -311,7 +311,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
@Test
public void testCreateSnapshotGracefulFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
- @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
@@ -345,7 +345,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
@Test
public void testCreateSnapshotAbnormalFailure() throws Exception {
listenerFactory = () -> new CounterListener() {
- @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
doneClo.accept(new IgniteInternalException("Very bad"));
}
};
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 2dfc7a4..5d46122 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.server.impl;
import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -278,7 +279,7 @@ public class JRaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override public void onSnapshotSave(SnapshotWriter writer, Closure done) {
try {
- listener.onSnapshotSave(writer.getPath(), res -> {
+ listener.onSnapshotSave(Path.of(writer.getPath()), res -> {
if (res == null) {
File file = new File(writer.getPath());
@@ -302,7 +303,12 @@ public class JRaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(SnapshotReader reader) {
- return listener.onSnapshotLoad(reader.getPath());
+ return listener.onSnapshotLoad(Path.of(reader.getPath()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onShutdown() {
+ listener.onShutdown();
}
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 34141e8..4f34d0d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.raft;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
@@ -268,16 +269,21 @@ public class PartitionListener implements RaftGroupListener {
}
/** {@inheritDoc} */
- @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
// Not implemented yet.
}
/** {@inheritDoc} */
- @Override public boolean onSnapshotLoad(String path) {
+ @Override public boolean onSnapshotLoad(Path path) {
// Not implemented yet.
return false;
}
+ /** {@inheritDoc} */
+ @Override public void onShutdown() {
+ // No-op.
+ }
+
/**
* Wrapper provides correct byte[] comparison.
*/