You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/08/25 07:18:39 UTC
[ignite-3] branch main updated: IGNITE-15298 RAFT snapshots
implementation for persistent partitions storage - Fixes #285.
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e73d124 IGNITE-15298 RAFT snapshots implementation for persistent partitions storage - Fixes #285.
e73d124 is described below
commit e73d1241464dc07b506eb2c088c739b516101d1e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Aug 25 10:17:51 2021 +0300
IGNITE-15298 RAFT snapshots implementation for persistent partitions storage - Fixes #285.
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../ITMetaStorageServicePersistenceTest.java | 391 +++------------------
modules/metastorage-server/pom.xml | 4 +-
.../server/persistence/RocksDBKeyValueStorage.java | 55 +--
.../server/persistence/RocksStorageUtils.java | 86 -----
.../server/persistence/WatchCursor.java | 5 +-
.../raft/client/service/RaftGroupListener.java | 2 +-
modules/raft/pom.xml | 6 +
.../service/ITAbstractListenerSnapshotTest.java} | 237 +++++--------
.../{storage-rocksdb => rocksdb-common}/pom.xml | 31 +-
.../ignite/internal/rocksdb}/ColumnFamily.java | 55 ++-
.../ignite/internal/rocksdb/RocksBiConsumer.java | 35 ++
.../ignite/internal/rocksdb/RocksBiPredicate.java | 36 ++
.../apache/ignite/internal/rocksdb/RocksUtils.java | 119 +++++++
.../ignite/internal/storage/InvokeClosure.java | 2 +-
.../apache/ignite/internal/storage/Storage.java | 19 +
.../storage/basic/ConcurrentHashMapStorage.java | 12 +
modules/storage-rocksdb/pom.xml | 5 +-
.../internal/storage/rocksdb/RocksDbStorage.java | 178 ++++++++--
modules/table/pom.xml | 7 +
.../ignite/distributed/ITTablePersistenceTest.java | 156 ++++++++
.../table/distributed/raft/PartitionListener.java | 17 +-
parent/pom.xml | 6 +
pom.xml | 1 +
23 files changed, 776 insertions(+), 689 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
index fc5632b..f1aa1a4 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -17,276 +17,112 @@
package org.apache.ignite.internal.metastorage.client;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
-import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.utils.ClusterServiceTestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import static java.lang.Thread.sleep;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Persistent (rocksdb-based) meta storage client tests.
+ * Persistent (rocksdb-based) meta storage raft group snapshots tests.
*/
@ExtendWith(WorkDirectoryExtension.class)
-public class ITMetaStorageServicePersistenceTest {
- /** Starting server port. */
- private static final int PORT = 5003;
-
- /** Starting client port. */
- private static final int CLIENT_PORT = 6003;
-
- /**
- * Peers list.
- */
- private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
- .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
- .map(Peer::new)
- .collect(Collectors.toUnmodifiableList());
-
+public class ITMetaStorageServicePersistenceTest extends ITAbstractListenerSnapshotTest<MetaStorageListener> {
/** */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
- /** Factory. */
- private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
-
- /** Network factory. */
- private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+ private static final ByteArray FIRST_KEY = ByteArray.fromString("first");
/** */
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+ private static final byte[] FIRST_VALUE = "firstValue".getBytes(StandardCharsets.UTF_8);
/** */
- @WorkDirectory
- private Path workDir;
-
- /** Cluster. */
- private final List<ClusterService> cluster = new ArrayList<>();
-
- /** Servers. */
- private final List<JRaftServerImpl> servers = new ArrayList<>();
-
- /** Clients. */
- private final List<RaftGroupService> clients = new ArrayList<>();
-
- /**
- * Shutdown raft server and stop all cluster nodes.
- *
- * @throws Exception If failed to shutdown raft server,
- */
- @AfterEach
- public void afterTest() throws Exception {
- for (RaftGroupService client : clients)
- client.shutdown();
-
- for (JRaftServerImpl server : servers)
- server.stop();
-
- for (ClusterService service : cluster)
- service.stop();
- }
-
- /**
- * Test parameters for {@link #testSnapshot}.
- */
- private static class TestData {
- /** Delete raft group folder. */
- private final boolean deleteFolder;
-
- /** Write to meta storage after a snapshot. */
- private final boolean writeAfterSnapshot;
-
- /** */
- private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
- this.deleteFolder = deleteFolder;
- this.writeAfterSnapshot = writeAfterSnapshot;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
- }
- }
-
- /**
- * @return {@link #testSnapshot} parameters.
- */
- private static List<TestData> testSnapshotData() {
- return List.of(
- new TestData(false, false),
- new TestData(true, true),
- new TestData(false, true),
- new TestData(true, false)
- );
- }
+ private static final ByteArray SECOND_KEY = ByteArray.fromString("second");
- /**
- * Tests that a joining raft node successfully restores a snapshot.
- *
- * @param testData Test parameters.
- * @throws Exception If failed.
- */
- @ParameterizedTest
- @MethodSource("testSnapshotData")
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-15298")
- public void testSnapshot(TestData testData) throws Exception {
- ByteArray firstKey = ByteArray.fromString("first");
- byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
+ /** */
+ private static final byte[] SECOND_VALUE = "secondValue".getBytes(StandardCharsets.UTF_8);
- // Setup a metastorage raft service
- RaftGroupService metaStorageSvc = prepareMetaStorage();
+ /** */
+ private MetaStorageServiceImpl metaStorage;
- MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc, null);
+ /** {@inheritDoc} */
+ @Override public void beforeFollowerStop(RaftGroupService service) throws Exception {
+ metaStorage = new MetaStorageServiceImpl(service, null);
// Put some data in the metastorage
- metaStorage.put(firstKey, firstValue).get();
+ metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
// Check that data has been written successfully
- check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
-
- // Select any node that is not the leader of the group
- JRaftServerImpl toStop = servers.stream()
- .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
- .findAny()
- .orElseThrow();
-
- // Get the path to that node's raft directory
- Path serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
-
- // Get the path to that node's RocksDB key-value storage
- Path dbPath = getStorage(toStop).getDbPath();
-
- int stopIdx = servers.indexOf(toStop);
-
- // Remove that node from the list of servers
- servers.remove(stopIdx);
-
- // Shutdown that node
- toStop.stop();
-
- // Create a raft snapshot of the metastorage service
- metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+ check(metaStorage, new EntryImpl(FIRST_KEY, FIRST_VALUE, 1, 1));;
+ }
+ /** {@inheritDoc} */
+ @Override public void afterFollowerStop(RaftGroupService service) throws Exception {
// Remove the first key from the metastorage
- metaStorage.remove(firstKey).get();
+ metaStorage.remove(FIRST_KEY).get();
// Check that data has been removed
- check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
+ check(metaStorage, new EntryImpl(FIRST_KEY, null, 2, 2));
// Put same data again
- metaStorage.put(firstKey, firstValue).get();
+ metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
// Check that it has been written
- check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
-
- // Create another raft snapshot
- metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
-
- byte[] lastKey = firstKey.bytes();
- byte[] lastValue = firstValue;
-
- if (testData.deleteFolder) {
- // Delete stopped node's raft directory and key-value storage directory
- // to check if snapshot could be restored by the restarted node
- IgniteUtils.deleteIfExists(dbPath);
- IgniteUtils.deleteIfExists(serverDataPath);
- }
-
- if (testData.writeAfterSnapshot) {
- // Put new data after the second snapshot to check if after
- // the snapshot restore operation restarted node will receive it
- ByteArray secondKey = ByteArray.fromString("second");
- byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
-
- metaStorage.put(secondKey, secondValue).get();
-
- lastKey = secondKey.bytes();
- lastValue = secondValue;
- }
-
- // Restart the node
- JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+ check(metaStorage, new EntryImpl(FIRST_KEY, FIRST_VALUE, 3, 3));
+ }
- assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+ /** {@inheritDoc} */
+ @Override public void afterSnapshot(RaftGroupService service) throws Exception {
+ metaStorage.put(SECOND_KEY, SECOND_VALUE).get();
+ }
- KeyValueStorage storage = getStorage(restarted);
+ /** {@inheritDoc} */
+ @Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
+ KeyValueStorage storage = getListener(restarted, raftGroupId()).getStorage();
- byte[] finalLastKey = lastKey;
+ byte[] lastKey = interactedAfterSnapshot ? SECOND_KEY.bytes() : FIRST_KEY.bytes();
+ byte[] lastValue = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
- int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
- int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
+ int expectedRevision = interactedAfterSnapshot ? 4 : 3;
+ int expectedUpdateCounter = interactedAfterSnapshot ? 4 : 3;
EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
- // Wait until the snapshot is restored
- boolean success = waitForCondition(() -> {
- org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
+ return () -> {
+ org.apache.ignite.internal.metastorage.server.Entry e = storage.get(lastKey);
return e.empty() == expectedLastEntry.empty()
&& e.tombstone() == expectedLastEntry.tombstone()
&& e.revision() == expectedLastEntry.revision()
&& e.updateCounter() == expectedLastEntry.revision()
&& Arrays.equals(e.key(), expectedLastEntry.key().bytes())
&& Arrays.equals(e.value(), expectedLastEntry.value());
- }, 3_000);
-
- assertTrue(success);
-
- // Check that the last value has been written successfully
- check(metaStorage, expectedLastEntry);
+ };
}
- /**
- * Get the meta store's key-value storage of the jraft server.
- *
- * @param server Server.
- * @return Meta store's key value storage.
- */
- private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
- org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
-
- DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
-
- MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+ /** {@inheritDoc} */
+ @Override public Path getListenerPersistencePath(MetaStorageListener listener) {
+ return ((RocksDBKeyValueStorage) listener.getStorage()).getDbPath();
+ }
- KeyValueStorage storage = listener.getStorage();
+ /** {@inheritDoc} */
+ @Override public RaftGroupListener createListener(Path workDir) {
+ return new MetaStorageListener(new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+ }
- return (RocksDBKeyValueStorage) storage;
+ /** {@inheritDoc} */
+ @Override public String raftGroupId() {
+ return "metastorage";
}
/**
@@ -303,133 +139,4 @@ public class ITMetaStorageServicePersistenceTest {
assertEquals(expected, entry);
}
-
- /** */
- @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
- long stop = System.currentTimeMillis() + timeout;
-
- while (System.currentTimeMillis() < stop) {
- if (cond.getAsBoolean())
- return true;
-
- try {
- sleep(50);
- }
- catch (InterruptedException e) {
- return false;
- }
- }
-
- return false;
- }
-
- /**
- * @param cluster The cluster.
- * @param exp Expected count.
- * @param timeout The timeout in millis.
- * @return {@code True} if topology size is equal to expected.
- */
- private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
- return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
- }
-
- /**
- * @return Local address.
- */
- private static String getLocalAddress() {
- try {
- return InetAddress.getLocalHost().getHostAddress();
- }
- catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Creates a cluster service.
- */
- private ClusterService clusterService(String name, int port, NetworkAddress otherPeer) {
- var nodeFinder = new StaticNodeFinder(List.of(otherPeer));
-
- var network = ClusterServiceTestUtils.clusterService(
- name,
- port,
- nodeFinder,
- SERIALIZATION_REGISTRY,
- NETWORK_FACTORY
- );
-
- network.start();
-
- cluster.add(network);
-
- return network;
- }
-
- /**
- * Starts a raft server.
- *
- * @param idx Server index (affects port of the server).
- * @param storage KeyValueStorage for the MetaStorage.
- * @return Server.
- */
- private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
- var addr = new NetworkAddress(getLocalAddress(), PORT);
-
- ClusterService service = clusterService("server" + idx, PORT + idx, addr);
-
- Path jraft = workDir.resolve("jraft" + idx);
-
- JRaftServerImpl server = new JRaftServerImpl(service, jraft) {
- @Override public void stop() {
- super.stop();
-
- service.stop();
- }
- };
-
- server.start();
-
- server.startRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- new MetaStorageListener(storage),
- INITIAL_CONF
- );
-
- servers.add(server);
-
- return server;
- }
-
- /**
- * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
- * a client.
- *
- * @return Meta storage raft group service instance.
- * @throws Exception If failed.
- */
- private RaftGroupService prepareMetaStorage() throws Exception {
- for (int i = 0; i < INITIAL_CONF.size(); i++)
- startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
-
- assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
-
- return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
- }
-
- /**
- * Starts a client with a specific address.
- *
- * @throws Exception If failed.
- */
- private RaftGroupService startClient(String groupId, NetworkAddress addr) throws Exception {
- ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
-
- RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
- List.of(new Peer(addr)), false, 200).get(3, TimeUnit.SECONDS);
-
- clients.add(client);
-
- return client;
- }
}
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index 07bb1df..d8a5c76 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -44,8 +44,8 @@
</dependency>
<dependency>
- <groupId>org.rocksdb</groupId>
- <artifactId>rocksdbjni</artifactId>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rocksdb-common</artifactId>
</dependency>
<dependency>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
index 507307d..8aea029 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.Operation;
import org.apache.ignite.internal.metastorage.server.Value;
import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksBiPredicate;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -53,7 +55,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
-import org.rocksdb.EnvOptions;
import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
@@ -61,7 +62,6 @@ import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
-import org.rocksdb.SstFileWriter;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
@@ -69,14 +69,15 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.find;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.forEach;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.find;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.forEach;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
/**
@@ -187,9 +188,9 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
- data = new ColumnFamily(db, handles.get(0), DATA, dataFamilyOptions, dataOptions);
+ data = new ColumnFamily(db, handles.get(0), DATA.name(), dataFamilyOptions, dataOptions);
- index = new ColumnFamily(db, handles.get(1), INDEX, indexFamilyOptions, indexOptions);
+ index = new ColumnFamily(db, handles.get(1), INDEX.name(), indexFamilyOptions, indexOptions);
}
catch (Exception e) {
try {
@@ -246,8 +247,11 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
}
}, snapshotExecutor).thenCompose(aVoid ->
// Create futures for capturing SST snapshots of the column families
- CompletableFuture.allOf(createSstFile(data, snapshot, tempPath), createSstFile(index, snapshot, tempPath)))
- .whenComplete((aVoid, throwable) -> {
+ CompletableFuture.allOf(
+ CompletableFuture.runAsync(() -> createSstFile(data, snapshot, tempPath), snapshotExecutor),
+ CompletableFuture.runAsync(() -> createSstFile(index, snapshot, tempPath), snapshotExecutor)
+ )
+ ).whenComplete((aVoid, throwable) -> {
// Release a snapshot
db.releaseSnapshot(snapshot);
@@ -271,39 +275,6 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
});
}
- /**
- * Create an SST file for the column family.
- *
- * @param columnFamily Column family.
- * @param snapshot Point-in-time snapshot.
- * @param path Directory to put the SST file in.
- * @return Future representing pending completion of the operation.
- */
- private CompletableFuture<Void> createSstFile(ColumnFamily columnFamily, Snapshot snapshot, Path path) {
- return CompletableFuture.runAsync(() -> {
- try (
- EnvOptions envOptions = new EnvOptions();
- Options options = new Options();
- ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
- RocksIterator it = columnFamily.newIterator(readOptions);
- SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
- ) {
- Path sstFile = path.resolve(columnFamily.name());
-
- sstFileWriter.open(sstFile.toString());
-
- it.seekToFirst();
-
- forEach(it, sstFileWriter::put);
-
- sstFileWriter.finish();
- }
- catch (Throwable t) {
- throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
- }
- }, snapshotExecutor);
- }
-
/** {@inheritDoc} */
@Override public void restoreSnapshot(Path path) {
rwLock.writeLock().lock();
@@ -1034,7 +1005,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
try (RocksIterator iterator = index.newIterator()) {
iterator.seek(key);
- RocksStorageUtils.RocksBiPredicate predicate = strictlyHigher ?
+ RocksBiPredicate predicate = strictlyHigher ?
(k, v) -> CMP.compare(k, key) > 0 :
(k, v) -> CMP.compare(k, key) >= 0;
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
index 54d13d7..0374ef1 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -23,11 +23,8 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.server.Value;
-import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
@@ -181,87 +178,4 @@ class RocksStorageUtils {
return result;
}
-
- /**
- * Iterates over the given iterator passing key-value pairs to the given consumer and
- * checks the iterator's status afterwards.
- *
- * @param iterator Iterator.
- * @param consumer Consumer of key-value pairs.
- * @throws RocksDBException If failed.
- */
- static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
- for (; iterator.isValid(); iterator.next())
- consumer.accept(iterator.key(), iterator.value());
-
- checkIterator(iterator);
- }
-
- /**
- * Iterates over the given iterator testing key-value pairs with the given predicate and checks
- * the iterator's status afterwards.
- *
- * @param iterator Iterator.
- * @param consumer Consumer of key-value pairs.
- * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
- * @throws RocksDBException If failed.
- */
- static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
- for (; iterator.isValid(); iterator.next()) {
- boolean result = consumer.test(iterator.key(), iterator.value());
-
- if (result)
- return true;
- }
-
- checkIterator(iterator);
-
- return false;
- }
-
- /**
- * Checks the status of the iterator and throws an exception if it is not correct.
- *
- * @param it RocksDB iterator.
- * @throws IgniteInternalException if the iterator has an incorrect status.
- */
- static void checkIterator(RocksIterator it) {
- try {
- it.status();
- }
- catch (RocksDBException e) {
- throw new IgniteInternalException(e);
- }
- }
-
- /**
- * BiConsumer that can throw {@link RocksDBException}.
- */
- @FunctionalInterface
- interface RocksBiConsumer {
- /**
- * Accepts the key and the value of the entry.
- *
- * @param key Key.
- * @param value Value.
- * @throws RocksDBException If failed to process the key-value pair.
- */
- void accept(byte[] key, byte[] value) throws RocksDBException;
- }
-
- /**
- * BiPredicate that can throw {@link RocksDBException}.
- */
- @FunctionalInterface
- interface RocksBiPredicate {
- /**
- * Evaluates the predicate on the given key and the given value.
- *
- * @param key Key.
- * @param value Value.
- * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
- * @throws RocksDBException If failed to test the key-value pair.
- */
- boolean test(byte[] key, byte[] value) throws RocksDBException;
- }
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
index 90ff5f6..54a790f 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.metastorage.server.Entry;
import org.apache.ignite.internal.metastorage.server.EntryEvent;
import org.apache.ignite.internal.metastorage.server.Value;
import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -35,9 +36,9 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.checkIterator;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.checkIterator;
/**
* Subscription on updates of entries corresponding to the given keys range (where the upper bound is unlimited)
@@ -185,7 +186,7 @@ class WatchCursor implements Cursor<WatchEvent> {
List<EntryEvent> evts = new ArrayList<>();
// Iterate over the keys of the current revision and get all matching entries.
- RocksStorageUtils.forEach(nativeIterator, (k, v) -> {
+ RocksUtils.forEach(nativeIterator, (k, v) -> {
ref.noItemsInRevision = false;
byte[] key = rocksKeyToBytes(k);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index e186fdf..64f8edc 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -57,7 +57,7 @@ public interface RaftGroupListener {
* @param doneClo The closure to call on finish. Pass the not null exception if the snapshot has not been created or
* null on successful creation.
*/
- public void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
+ void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
/**
* The callback to load a snapshot.
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 73ad792..1682f5e 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -84,6 +84,12 @@
</dependency>
<dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
similarity index 55%
copy from modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
index fc5632b..0f55992 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
@@ -15,30 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.client;
+package org.apache.ignite.raft.client.service;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -48,24 +39,23 @@ import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
-import static java.lang.Thread.sleep;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Persistent (rocksdb-based) meta storage client tests.
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
*/
@ExtendWith(WorkDirectoryExtension.class)
-public class ITMetaStorageServicePersistenceTest {
+public abstract class ITAbstractListenerSnapshotTest<T extends RaftGroupListener> {
/** Starting server port. */
private static final int PORT = 5003;
@@ -80,9 +70,6 @@ public class ITMetaStorageServicePersistenceTest {
.map(Peer::new)
.collect(Collectors.toUnmodifiableList());
- /** */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
/** Factory. */
private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
@@ -126,21 +113,27 @@ public class ITMetaStorageServicePersistenceTest {
* Test parameters for {@link #testSnapshot}.
*/
private static class TestData {
- /** Delete raft group folder. */
+ /**
+ * {@code true} if the raft group's persistence must be cleared before the follower's restart,
+ * {@code false} otherwise.
+ */
private final boolean deleteFolder;
- /** Write to meta storage after a snapshot. */
- private final boolean writeAfterSnapshot;
+ /**
+ * {@code true} if test should interact with the raft group after a snapshot has been captured.
+ * In this case, the follower node should catch up with the leader using raft log.
+ */
+ private final boolean interactAfterSnapshot;
/** */
- private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
+ private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
this.deleteFolder = deleteFolder;
- this.writeAfterSnapshot = writeAfterSnapshot;
+ this.interactAfterSnapshot = interactAfterSnapshot;
}
/** {@inheritDoc} */
@Override public String toString() {
- return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
+ return String.format("deleteFolder=%s, interactAfterSnapshot=%s", deleteFolder, interactAfterSnapshot);
}
}
@@ -164,33 +157,23 @@ public class ITMetaStorageServicePersistenceTest {
*/
@ParameterizedTest
@MethodSource("testSnapshotData")
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-15298")
public void testSnapshot(TestData testData) throws Exception {
- ByteArray firstKey = ByteArray.fromString("first");
- byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
-
- // Setup a metastorage raft service
- RaftGroupService metaStorageSvc = prepareMetaStorage();
-
- MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc, null);
-
- // Put some data in the metastorage
- metaStorage.put(firstKey, firstValue).get();
+ // Set up a raft group service
+ RaftGroupService service = prepareRaftGroup();
- // Check that data has been written successfully
- check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
+ beforeFollowerStop(service);
// Select any node that is not the leader of the group
JRaftServerImpl toStop = servers.stream()
- .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
+ .filter(server -> !server.localPeer(raftGroupId()).equals(service.leader()))
.findAny()
.orElseThrow();
// Get the path to that node's raft directory
- Path serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
+ Path serverDataPath = toStop.getServerDataPath(raftGroupId());
// Get the path to that node's RocksDB key-value storage
- Path dbPath = getStorage(toStop).getDbPath();
+ Path dbPath = getListenerPersistencePath(getListener(toStop, raftGroupId()));
int stopIdx = servers.indexOf(toStop);
@@ -200,127 +183,106 @@ public class ITMetaStorageServicePersistenceTest {
// Shutdown that node
toStop.stop();
- // Create a raft snapshot of the metastorage service
- metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+ // Create a snapshot of the raft group
+ service.snapshot(service.leader()).get();
- // Remove the first key from the metastorage
- metaStorage.remove(firstKey).get();
-
- // Check that data has been removed
- check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
-
- // Put same data again
- metaStorage.put(firstKey, firstValue).get();
-
- // Check that it has been written
- check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
+ afterFollowerStop(service);
// Create another raft snapshot
- metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
-
- byte[] lastKey = firstKey.bytes();
- byte[] lastValue = firstValue;
+ service.snapshot(service.leader()).get();
if (testData.deleteFolder) {
- // Delete stopped node's raft directory and key-value storage directory
+ // Delete a stopped node's raft directory and key-value storage directory
// to check if snapshot could be restored by the restarted node
IgniteUtils.deleteIfExists(dbPath);
IgniteUtils.deleteIfExists(serverDataPath);
}
- if (testData.writeAfterSnapshot) {
- // Put new data after the second snapshot to check if after
- // the snapshot restore operation restarted node will receive it
- ByteArray secondKey = ByteArray.fromString("second");
- byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
-
- metaStorage.put(secondKey, secondValue).get();
-
- lastKey = secondKey.bytes();
- lastValue = secondValue;
+ if (testData.interactAfterSnapshot) {
+ // Interact with the raft group after the second snapshot to check if the restarted node would see these
+ // interactions after restoring a snapshot and raft logs
+ afterSnapshot(service);
}
// Restart the node
- JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+ JRaftServerImpl restarted = startServer(stopIdx);
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
- KeyValueStorage storage = getStorage(restarted);
+ BooleanSupplier closure = snapshotCheckClosure(restarted, testData.interactAfterSnapshot);
- byte[] finalLastKey = lastKey;
-
- int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
- int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
-
- EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
-
- // Wait until the snapshot is restored
- boolean success = waitForCondition(() -> {
- org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
- return e.empty() == expectedLastEntry.empty()
- && e.tombstone() == expectedLastEntry.tombstone()
- && e.revision() == expectedLastEntry.revision()
- && e.updateCounter() == expectedLastEntry.revision()
- && Arrays.equals(e.key(), expectedLastEntry.key().bytes())
- && Arrays.equals(e.value(), expectedLastEntry.value());
- }, 3_000);
+ boolean success = waitForCondition(closure, 10_000);
assertTrue(success);
-
- // Check that the last value has been written successfully
- check(metaStorage, expectedLastEntry);
}
/**
- * Get the meta store's key-value storage of the jraft server.
+ * Interacts with the raft group before a follower is stopped.
*
- * @param server Server.
- * @return Meta store's key value storage.
+ * @param service Raft group service.
+ * @throws Exception If failed.
*/
- private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
- org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
+ public abstract void beforeFollowerStop(RaftGroupService service) throws Exception;
- DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+ /**
+ * Interacts with the raft group after a follower is stopped.
+ *
+ * @param service Raft group service.
+ * @throws Exception If failed.
+ */
+ public abstract void afterFollowerStop(RaftGroupService service) throws Exception;
- MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+ /**
+ * Interacts with a raft group after the leader has captured a snapshot.
+ *
+ * @param service Raft group service.
+ * @throws Exception If failed.
+ */
+ public abstract void afterSnapshot(RaftGroupService service) throws Exception;
- KeyValueStorage storage = listener.getStorage();
+ /**
+ * Creates a closure that will be executed periodically to check if the snapshot and (conditionally on the
+ * {@link TestData#interactAfterSnapshot}) the raft log was successfully restored by the follower node.
+ *
+ * @param restarted Restarted follower node.
+ * @param interactedAfterSnapshot {@code true} whether raft group was interacted with after the snapshot operation.
+ * @return Closure.
+ */
+ public abstract BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot);
- return (RocksDBKeyValueStorage) storage;
- }
+ /**
+ * @param listener Raft group listener.
+ * @return Path to the group's persistence.
+ */
+ public abstract Path getListenerPersistencePath(T listener);
/**
- * Check meta storage entry.
+ * Creates raft group listener.
*
- * @param metaStorage Meta storage service.
- * @param expected Expected entry.
- * @throws ExecutionException If failed.
- * @throws InterruptedException If failed.
+ * @param workDir Work directory.
+ * @return Raft group listener.
*/
- private void check(MetaStorageServiceImpl metaStorage, EntryImpl expected)
- throws ExecutionException, InterruptedException {
- Entry entry = metaStorage.get(expected.key()).get();
-
- assertEquals(expected, entry);
- }
+ public abstract RaftGroupListener createListener(Path workDir);
- /** */
- @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
- long stop = System.currentTimeMillis() + timeout;
+ /**
+ * @return Raft group id for tests.
+ */
+ public abstract String raftGroupId();
- while (System.currentTimeMillis() < stop) {
- if (cond.getAsBoolean())
- return true;
+ /**
+ * Get the raft group listener from the jraft server.
+ *
+ * @param server Server.
+ * @param grpId Raft group id.
+ * @return Raft group listener.
+ */
+ protected T getListener(JRaftServerImpl server, String grpId) {
+ org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(grpId);
- try {
- sleep(50);
- }
- catch (InterruptedException e) {
- return false;
- }
- }
+ JRaftServerImpl.DelegatingStateMachine fsm =
+ (JRaftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
- return false;
+ return (T) fsm.getListener();
}
/**
@@ -329,7 +291,7 @@ public class ITMetaStorageServicePersistenceTest {
* @param timeout The timeout in millis.
* @return {@code True} if topology size is equal to expected.
*/
- private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
+ private boolean waitForTopology(ClusterService cluster, int exp, int timeout) throws InterruptedException {
return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
}
@@ -370,10 +332,9 @@ public class ITMetaStorageServicePersistenceTest {
* Starts a raft server.
*
* @param idx Server index (affects port of the server).
- * @param storage KeyValueStorage for the MetaStorage.
* @return Server.
*/
- private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
+ private JRaftServerImpl startServer(int idx) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService("server" + idx, PORT + idx, addr);
@@ -391,8 +352,8 @@ public class ITMetaStorageServicePersistenceTest {
server.start();
server.startRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- new MetaStorageListener(storage),
+ raftGroupId(),
+ createListener(workDir),
INITIAL_CONF
);
@@ -402,25 +363,21 @@ public class ITMetaStorageServicePersistenceTest {
}
/**
- * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
- * a client.
+ * Prepares raft group service by instantiating raft servers and a client.
*
- * @return Meta storage raft group service instance.
- * @throws Exception If failed.
+ * @return Raft group service instance.
*/
- private RaftGroupService prepareMetaStorage() throws Exception {
+ private RaftGroupService prepareRaftGroup() throws Exception {
for (int i = 0; i < INITIAL_CONF.size(); i++)
- startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+ startServer(i);
assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
- return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
+ return startClient(raftGroupId(), new NetworkAddress(getLocalAddress(), PORT));
}
/**
* Starts a client with a specific address.
- *
- * @throws Exception If failed.
*/
private RaftGroupService startClient(String groupId, NetworkAddress addr) throws Exception {
ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
diff --git a/modules/storage-rocksdb/pom.xml b/modules/rocksdb-common/pom.xml
similarity index 66%
copy from modules/storage-rocksdb/pom.xml
copy to modules/rocksdb-common/pom.xml
index 84ee9ad..a05ce57 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/rocksdb-common/pom.xml
@@ -29,13 +29,13 @@
<relativePath>../../parent/pom.xml</relativePath>
</parent>
- <artifactId>ignite-storage-rocksdb</artifactId>
+ <artifactId>ignite-rocksdb-common</artifactId>
<version>3.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-storage-api</artifactId>
+ <artifactId>ignite-core</artifactId>
</dependency>
<!-- 3rd party dependencies -->
@@ -43,32 +43,5 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-library</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-core</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-storage-api</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
similarity index 73%
rename from modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
rename to modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index 4052aeb..fcdc676 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.server.persistence;
+package org.apache.ignite.internal.rocksdb;
import java.util.List;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -34,12 +34,12 @@ import org.rocksdb.WriteBatch;
/**
* Wrapper for the column family that encapsulates {@link ColumnFamilyHandle} and RocksDB's operations with it.
*/
-class ColumnFamily implements AutoCloseable {
+public class ColumnFamily implements AutoCloseable {
/** RocksDB instance. */
private final RocksDB db;
- /** Column family type. */
- private final StorageColumnFamilyType cfType;
+ /** Column family name. */
+ private final String cfName;
/** Column family handle. */
private final ColumnFamilyHandle cfHandle;
@@ -55,20 +55,20 @@ class ColumnFamily implements AutoCloseable {
*
* @param db Db.
* @param handle Column family handle.
- * @param cfType Column family type.
+ * @param cfName Column family name.
* @param cfOptions Column family options.
* @param options Options for the column family options.
* @throws RocksDBException If failed.
*/
- ColumnFamily(
+ public ColumnFamily(
RocksDB db,
ColumnFamilyHandle handle,
- StorageColumnFamilyType cfType,
+ String cfName,
ColumnFamilyOptions cfOptions,
Options options
) {
this.db = db;
- this.cfType = cfType;
+ this.cfName = cfName;
this.cfOptions = cfOptions;
this.options = options;
this.cfHandle = handle;
@@ -87,11 +87,23 @@ class ColumnFamily implements AutoCloseable {
* @throws RocksDBException If failed.
* @see RocksDB#get(ColumnFamilyHandle, byte[])
*/
- byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
+ public byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
return db.get(cfHandle, key);
}
/**
+ * Puts a key-value pair into this column family.
+ *
+ * @param key Key.
+ * @param value Value.
+ * @throws RocksDBException If failed.
+ * @see RocksDB#put(ColumnFamilyHandle, byte[], byte[])
+ */
+ public void put(byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+ db.put(cfHandle, key, value);
+ }
+
+ /**
* Puts a key-value pair into this column family within the write batch.
*
* @param batch Write batch.
@@ -100,11 +112,22 @@ class ColumnFamily implements AutoCloseable {
* @throws RocksDBException If failed.
* @see WriteBatch#put(ColumnFamilyHandle, byte[], byte[])
*/
- void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+ public void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
batch.put(cfHandle, key, value);
}
/**
+ * Deletes the entry mapped by the key and associated with this column family.
+ *
+ * @param key Key.
+ * @throws RocksDBException If failed.
+ * @see RocksDB#delete(ColumnFamilyHandle, byte[])
+ */
+ public void delete(byte @NotNull [] key) throws RocksDBException {
+ db.delete(cfHandle, key);
+ }
+
+ /**
* Deletes the entry mapped by the key and associated with this column family within the write batch.
*
* @param batch Write batch.
@@ -112,7 +135,7 @@ class ColumnFamily implements AutoCloseable {
* @throws RocksDBException If failed.
* @see WriteBatch#delete(ColumnFamilyHandle, byte[])
*/
- void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
+ public void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
batch.delete(cfHandle, key);
}
@@ -122,7 +145,7 @@ class ColumnFamily implements AutoCloseable {
* @return Iterator.
* @see RocksDB#newIterator(ColumnFamilyHandle)
*/
- RocksIterator newIterator() {
+ public RocksIterator newIterator() {
return db.newIterator(cfHandle);
}
@@ -133,7 +156,7 @@ class ColumnFamily implements AutoCloseable {
* @return Iterator.
* @see RocksDB#newIterator(ColumnFamilyHandle, ReadOptions)
*/
- RocksIterator newIterator(ReadOptions options) {
+ public RocksIterator newIterator(ReadOptions options) {
return db.newIterator(cfHandle, options);
}
@@ -145,14 +168,14 @@ class ColumnFamily implements AutoCloseable {
* @throws RocksDBException If failed.
* @see RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)
*/
- void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
+ public void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
db.ingestExternalFile(cfHandle, paths, options);
}
/**
* @return Name of the column family.
*/
- String name() {
- return cfType.name();
+ public String name() {
+ return cfName;
}
}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java
new file mode 100644
index 0000000..fabbdfb
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.rocksdb.RocksDBException;
+
+/**
+ * BiConsumer that can throw {@link RocksDBException}.
+ */
+@FunctionalInterface
+public interface RocksBiConsumer {
+ /**
+ * Accepts the key and the value of the entry.
+ *
+ * @param key Key.
+ * @param value Value.
+ * @throws RocksDBException If failed to process the key-value pair.
+ */
+ void accept(byte[] key, byte[] value) throws RocksDBException;
+}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java
new file mode 100644
index 0000000..3525b34
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.rocksdb.RocksDBException;
+
+/**
+ * BiPredicate that can throw {@link RocksDBException}.
+ */
+@FunctionalInterface
+public interface RocksBiPredicate {
+ /**
+ * Evaluates the predicate on the given key and the given value.
+ *
+ * @param key Key.
+ * @param value Value.
+ * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
+ * @throws RocksDBException If failed to test the key-value pair.
+ */
+ boolean test(byte[] key, byte[] value) throws RocksDBException;
+}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
new file mode 100644
index 0000000..f9acbb4
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import java.nio.file.Path;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * RocksDB utility functions.
+ */
+public class RocksUtils {
+ /**
+ * Creates an SST file for the column family.
+ *
+ * @param columnFamily Column family.
+ * @param snapshot Point-in-time snapshot.
+ * @param path Directory to put the SST file in.
+ */
+ public static void createSstFile(
+ ColumnFamily columnFamily,
+ Snapshot snapshot,
+ Path path
+ ) {
+ try (
+ EnvOptions envOptions = new EnvOptions();
+ Options options = new Options();
+ ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
+ RocksIterator it = columnFamily.newIterator(readOptions);
+ SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
+ ) {
+ Path sstFile = path.resolve(columnFamily.name());
+
+ sstFileWriter.open(sstFile.toString());
+
+ it.seekToFirst();
+
+ forEach(it, sstFileWriter::put);
+
+ sstFileWriter.finish();
+ }
+ catch (Throwable t) {
+ throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
+ }
+ }
+
+ /**
+ * Iterates over the given iterator passing key-value pairs to the given consumer and
+ * checks the iterator's status afterwards.
+ *
+ * @param iterator Iterator.
+ * @param consumer Consumer of key-value pairs.
+ * @throws RocksDBException If failed.
+ */
+ public static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
+ for (; iterator.isValid(); iterator.next())
+ consumer.accept(iterator.key(), iterator.value());
+
+ checkIterator(iterator);
+ }
+
+ /**
+ * Iterates over the given iterator testing key-value pairs with the given predicate and checks
+ * the iterator's status afterwards.
+ *
+ * @param iterator Iterator.
+ * @param consumer Consumer of key-value pairs.
+ * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
+ * @throws RocksDBException If failed.
+ */
+ public static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
+ for (; iterator.isValid(); iterator.next()) {
+ boolean result = consumer.test(iterator.key(), iterator.value());
+
+ if (result)
+ return true;
+ }
+
+ checkIterator(iterator);
+
+ return false;
+ }
+
+ /**
+ * Checks the status of the iterator and throws an exception if it is not correct.
+ *
+ * @param it RocksDB iterator.
+ * @throws IgniteInternalException if the iterator has an incorrect status.
+ */
+ public static void checkIterator(RocksIterator it) {
+ try {
+ it.status();
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
index 67dd309..589a589 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
@@ -51,7 +51,7 @@ public interface InvokeClosure<T> {
/**
* @return Operation type for this closure or {@code null} if it is unknown.
* After method {@link #call(DataRow)} has been called, operation type must
- * be know and this method can not return {@code null}.
+ * be known and this method can not return {@code null}.
*/
@Nullable OperationType operationType();
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
index 0778e74..80e4e02 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
@@ -17,9 +17,12 @@
package org.apache.ignite.internal.storage;
+import java.nio.file.Path;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -117,5 +120,21 @@ public interface Storage extends AutoCloseable {
* @throws StorageException If failed to read data or storage is already stopped.
*/
public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException;
+
+ /**
+ * Creates a snapshot of the storage's current state in the specified directory.
+ *
+ * @param snapshotPath Directory to store a snapshot.
+ * @return Future representing pending completion of the operation. Can not be {@code null}.
+ */
+ @NotNull
+ CompletableFuture<Void> snapshot(Path snapshotPath);
+
+ /**
+ * Restores a state of the storage which was previously captured with a {@link #snapshot(Path)}.
+ *
+ * @param snapshotPath Path to the snapshot's directory.
+ */
+ void restoreSnapshot(Path snapshotPath);
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
index 9b8d820..3793c35 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.storage.basic;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
@@ -170,6 +172,16 @@ public class ConcurrentHashMapStorage implements Storage {
}
/** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException("Not implemented!");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path snapshotPath) {
+ throw new UnsupportedOperationException("Not implemented!");
+ }
+
+ /** {@inheritDoc} */
@Override public void close() throws Exception {
// No-op.
}
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
index 84ee9ad..5c3a01a 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/storage-rocksdb/pom.xml
@@ -38,10 +38,9 @@
<artifactId>ignite-storage-api</artifactId>
</dependency>
- <!-- 3rd party dependencies -->
<dependency>
- <groupId>org.rocksdb</groupId>
- <artifactId>rocksdbjni</artifactId>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rocksdb-common</artifactId>
</dependency>
<!-- Test dependencies -->
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
index fbb3652..71d224e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
@@ -17,17 +17,26 @@
package org.apache.ignite.internal.storage.rocksdb;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.InvokeClosure;
import org.apache.ignite.internal.storage.SearchRow;
@@ -39,19 +48,34 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
+
/**
* Storage implementation based on a single RocksDB instance.
*/
public class RocksDbStorage implements Storage {
+ /** Suffix for the temporary snapshot folder */
+ private static final String TMP_SUFFIX = ".tmp";
+
+ /** Snapshot file name. */
+ private static final String COLUMN_FAMILY_NAME = "data";
+
static {
RocksDB.loadLibrary();
}
@@ -63,11 +87,20 @@ public class RocksDbStorage implements Storage {
private final AbstractComparator comparator;
/** RockDB options. */
- private final Options options;
+ private final DBOptions options;
/** RocksDb instance. */
private final RocksDB db;
+ /** Data column family. */
+ private final ColumnFamily data;
+
+ /** DB path. */
+ private final Path dbPath;
+
+ /** Thread-pool for snapshot operations execution. */
+ private final ExecutorService snapshotExecutor = Executors.newSingleThreadExecutor();
+
/**
* @param dbPath Path to the folder to store data.
* @param comparator Keys comparator.
@@ -75,6 +108,8 @@ public class RocksDbStorage implements Storage {
*/
public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
try {
+ this.dbPath = dbPath;
+
comparatorOptions = new ComparatorOptions();
this.comparator = new AbstractComparator(comparatorOptions) {
@@ -89,13 +124,23 @@ public class RocksDbStorage implements Storage {
}
};
- options = new Options();
+ options = new DBOptions()
+ .setCreateMissingColumnFamilies(true)
+ .setCreateIfMissing(true);
- options.setCreateIfMissing(true);
+ Options dataOptions = new Options().setCreateIfMissing(true).setComparator(this.comparator);
- options.setComparator(this.comparator);
+ ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
- this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+ List<ColumnFamilyDescriptor> descriptors = Collections.singletonList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, dataFamilyOptions)
+ );
+
+ var handles = new ArrayList<ColumnFamilyHandle>();
+
+ this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
+
+ this.data = new ColumnFamily(db, handles.get(0), COLUMN_FAMILY_NAME, dataFamilyOptions, dataOptions);
}
catch (RocksDBException e) {
try {
@@ -114,7 +159,7 @@ public class RocksDbStorage implements Storage {
try {
byte[] keyBytes = key.keyBytes();
- return new SimpleDataRow(keyBytes, db.get(keyBytes));
+ return new SimpleDataRow(keyBytes, data.get(keyBytes));
}
catch (RocksDBException e) {
throw new StorageException("Failed to read data from the storage", e);
@@ -148,7 +193,11 @@ public class RocksDbStorage implements Storage {
/** {@inheritDoc} */
@Override public void write(DataRow row) throws StorageException {
try {
- db.put(row.keyBytes(), row.valueBytes());
+ byte[] value = row.valueBytes();
+
+ assert value != null;
+
+ data.put(row.keyBytes(), value);
}
catch (RocksDBException e) {
throw new StorageException("Filed to write data to the storage", e);
@@ -159,8 +208,13 @@ public class RocksDbStorage implements Storage {
@Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
try (WriteBatch batch = new WriteBatch();
WriteOptions opts = new WriteOptions()) {
- for (DataRow row : rows)
- batch.put(row.keyBytes(), row.valueBytes());
+ for (DataRow row : rows) {
+ byte[] value = row.valueBytes();
+
+ assert value != null;
+
+ data.put(batch, row.keyBytes(), value);
+ }
db.write(opts, batch);
}
@@ -177,9 +231,13 @@ public class RocksDbStorage implements Storage {
WriteOptions opts = new WriteOptions()) {
for (DataRow row : rows) {
- if (db.get(row.keyBytes()) == null)
- batch.put(row.keyBytes(), row.valueBytes());
- else
+ if (data.get(row.keyBytes()) == null) {
+ byte[] value = row.valueBytes();
+
+ assert value != null;
+
+ data.put(batch, row.keyBytes(), value);
+ } else
cantInsert.add(row);
}
@@ -195,7 +253,7 @@ public class RocksDbStorage implements Storage {
/** {@inheritDoc} */
@Override public void remove(SearchRow key) throws StorageException {
try {
- db.delete(key.keyBytes());
+ data.delete(key.keyBytes());
}
catch (RocksDBException e) {
throw new StorageException("Failed to remove data from the storage", e);
@@ -212,12 +270,12 @@ public class RocksDbStorage implements Storage {
for (SearchRow key : keys) {
byte[] keyBytes = key.keyBytes();
- byte[] value = db.get(keyBytes);
+ byte[] value = data.get(keyBytes);
if (value != null) {
res.add(new SimpleDataRow(keyBytes, value));
- batch.delete(keyBytes);
+ data.delete(batch, keyBytes);
}
}
@@ -260,7 +318,7 @@ public class RocksDbStorage implements Storage {
if (Arrays.equals(value, expectedValue)) {
res.add(new SimpleDataRow(key, value));
- batch.delete(key);
+ data.delete(batch, key);
}
}
@@ -278,18 +336,26 @@ public class RocksDbStorage implements Storage {
@Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
try {
byte[] keyBytes = key.keyBytes();
- byte[] existingDataBytes = db.get(keyBytes);
+ byte[] existingDataBytes = data.get(keyBytes);
clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
switch (clo.operationType()) {
case WRITE:
- db.put(keyBytes, clo.newRow().valueBytes());
+ DataRow newRow = clo.newRow();
+
+ assert newRow != null;
+
+ byte[] value = newRow.valueBytes();
+
+ assert value != null;
+
+ data.put(keyBytes, value);
break;
case REMOVE:
- db.delete(keyBytes);
+ data.delete(keyBytes);
break;
@@ -306,15 +372,75 @@ public class RocksDbStorage implements Storage {
/** {@inheritDoc} */
@Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
- return new ScanCursor(db.newIterator(), filter);
+ return new ScanCursor(data.newIterator(), filter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+ Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+ // Create a RocksDB point-in-time snapshot
+ Snapshot snapshot = db.getSnapshot();
+
+ return CompletableFuture.runAsync(() -> {
+ // (Re)create the temporary directory
+ IgniteUtils.deleteIfExists(tempPath);
+
+ try {
+ Files.createDirectories(tempPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException("Failed to create directory: " + tempPath, e);
+ }
+ }, snapshotExecutor)
+ .thenRunAsync(() -> createSstFile(data, snapshot, tempPath), snapshotExecutor)
+ .whenComplete((aVoid, throwable) -> {
+ // Release a snapshot
+ db.releaseSnapshot(snapshot);
+
+ // Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
+ // database does. Calling close to maintain the AutoCloseable semantics
+ snapshot.close();
+
+ if (throwable != null)
+ return;
+
+ // Delete snapshot directory if it already exists
+ IgniteUtils.deleteIfExists(snapshotPath);
+
+ try {
+ // Rename the temporary directory
+ Files.move(tempPath, snapshotPath);
+ }
+ catch (IOException e) {
+ throw new IgniteInternalException("Failed to rename: " + tempPath + " to " + snapshotPath, e);
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path path) {
+ try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
+ Path snapshotPath = path.resolve(COLUMN_FAMILY_NAME);
+
+ if (!Files.exists(snapshotPath))
+ throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
+
+ this.data.ingestExternalFile(Collections.singletonList(snapshotPath.toString()), ingestOptions);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException("Fail to ingest sst file at path: " + path, e);
+ }
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
- IgniteUtils.closeAll(comparatorOptions, comparator, options, db);
+ IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
+
+ IgniteUtils.closeAll(data, db, options, comparator, comparatorOptions);
}
- /** Cusror wrapper over the RocksIterator object with custom filter. */
+ /** Cursor wrapper over the RocksIterator object with custom filter. */
private static class ScanCursor implements Cursor<DataRow> {
/** Iterator from RocksDB. */
private final RocksIterator iter;
@@ -379,4 +505,12 @@ public class RocksDbStorage implements Storage {
iter.close();
}
}
+
+ /**
+ * @return Path to the database.
+ */
+ @TestOnly
+ public Path getDbPath() {
+ return dbPath;
+ }
}
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index d005e3b..d63fa39 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -145,6 +145,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-raft</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
<!-- Benchmarks dependencies -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
new file mode 100644
index 0000000..1e9dafa
--- /dev/null
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Persistent (rocksdb-based) partitions raft group snapshots tests.
+ */
+public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<PartitionListener> {
+ /** */
+ private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+ 1,
+ new Column[] {new Column("key", NativeTypes.INT64, false)},
+ new Column[] {new Column("value", NativeTypes.INT64, false)}
+ );
+
+ /** */
+ private static final Row FIRST_KEY = createKeyRow(0);
+
+ /** */
+ private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+
+ /** */
+ private static final Row SECOND_KEY = createKeyRow(1);
+
+ /** */
+ private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+
+ /** {@inheritDoc} */
+ @Override public void beforeFollowerStop(RaftGroupService service) throws Exception {
+ var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+ table.upsert(FIRST_VALUE, null).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterFollowerStop(RaftGroupService service) throws Exception {
+ var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+ // Remove the first key
+ table.delete(FIRST_KEY, null).get();
+
+ // Put deleted data again
+ table.upsert(FIRST_VALUE, null).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void afterSnapshot(RaftGroupService service) throws Exception {
+ var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+ table.upsert(SECOND_VALUE, null).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
+ RocksDbStorage storage = (RocksDbStorage) getListener(restarted, raftGroupId()).getStorage();
+
+ Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
+ Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
+
+ ByteBuffer buffer = key.keySlice();
+ byte[] keyBytes = new byte[buffer.capacity()];
+ buffer.get(keyBytes);
+
+ SimpleDataRow finalRow = new SimpleDataRow(keyBytes, null);
+ SimpleDataRow finalValue = new SimpleDataRow(keyBytes, value.bytes());
+
+ return () -> {
+ DataRow read = storage.read(finalRow);
+ return Objects.equals(finalValue, read);
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public Path getListenerPersistencePath(PartitionListener listener) {
+ return ((RocksDbStorage) listener.getStorage()).getDbPath();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RaftGroupListener createListener(Path workDir) {
+ return new PartitionListener(
+ new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public String raftGroupId() {
+ return "partitions";
+ }
+
+ /**
+ * Creates a {@link Row} with the supplied key.
+ *
+ * @param id Key.
+ * @return Row.
+ */
+ private static Row createKeyRow(long id) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+ rowBuilder.appendLong(id);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+ }
+
+ /**
+ * Creates a {@link Row} with the supplied key and value.
+ *
+ * @param id Key.
+ * @param value Value.
+ * @return Row.
+ */
+ private static Row createKeyValueRow(long id, long value) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+ rowBuilder.appendLong(id);
+ rowBuilder.appendLong(value);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dd37cb9..cd90d9c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -57,6 +57,7 @@ import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
/**
* Partition command handler.
@@ -301,13 +302,15 @@ public class PartitionListener implements RaftGroupListener {
/** {@inheritDoc} */
@Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
- // Not implemented yet.
+ storage.snapshot(path).whenComplete((unused, throwable) -> {
+ doneClo.accept(throwable);
+ });
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(Path path) {
- // Not implemented yet.
- return false;
+ storage.restoreSnapshot(path);
+ return true;
}
/** {@inheritDoc} */
@@ -345,4 +348,12 @@ public class PartitionListener implements RaftGroupListener {
return new SimpleDataRow(key, null);
}
+
+ /**
+ * @return Underlying storage.
+ */
+ @TestOnly
+ public Storage getStorage() {
+ return storage;
+ }
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 9fe2172..c3ac635 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -246,6 +246,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-rocksdb-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-schema</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/pom.xml b/pom.xml
index 11cdcd5..aa83c61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
<module>modules/raft</module>
<module>modules/raft-client</module>
<module>modules/rest</module>
+ <module>modules/rocksdb-common</module>
<module>modules/runner</module>
<module>modules/schema</module>
<module>modules/storage-api</module>