You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/30 08:10:09 UTC
[ignite-3] branch main updated: IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fa6d23e8d IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)
fa6d23e8d is described below
commit fa6d23e8d62b6bf076f63f0a5a9998368833bb84
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Jun 30 12:10:04 2022 +0400
IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)
---
.../management/raft/ItCmgRaftServiceTest.java | 4 +-
.../management/ClusterManagementGroupManager.java | 16 +-
.../client/ItMetaStorageRaftGroupTest.java | 7 +-
.../client/ItMetaStorageServiceTest.java | 3 +-
.../internal/metastorage/MetaStorageManager.java | 4 +-
.../apache/ignite/internal/raft/ItLozaTest.java | 3 +-
.../internal/raft/ItRaftGroupServiceTest.java | 4 +-
.../service/ItAbstractListenerSnapshotTest.java | 4 +-
.../raft/server/ItJraftCounterServerTest.java | 23 +--
.../raft/server/ItSimpleCounterServerTest.java | 9 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 39 ++--
.../internal/raft/server/RaftGroupOptions.java | 78 ++++++++
.../ignite/internal/raft/server/RaftServer.java | 13 +-
.../internal/raft/server/impl/JraftServerImpl.java | 30 ++-
.../ignite/raft/jraft/JRaftServiceFactory.java | 10 +-
.../jraft/core/DefaultJRaftServiceFactory.java | 12 --
...ctory.java => VolatileJRaftServiceFactory.java} | 42 ++---
.../raft/jraft/storage/VolatileStorage.java} | 13 +-
.../raft/jraft/storage/impl/LocalLogStorage.java | 5 +-
.../storage/impl/VolatileRaftMetaStorage.java | 75 ++++++++
.../org/apache/ignite/internal/raft/LozaTest.java | 11 +-
.../internal/raft/server/impl/RaftServerImpl.java | 13 +-
.../raft/jraft/core/TestJRaftServiceFactory.java | 2 +-
...y.java => VolatileJRaftServiceFactoryTest.java} | 27 ++-
.../jraft/storage/impl/LocalLogStorageTest.java | 2 +-
.../raft/jraft/storage/impl/LogManagerTest.java | 2 +-
.../storage/impl/VolatileRaftMetaStorageTest.java | 90 +++++++++
modules/runner/pom.xml | 6 +
.../internal/AbstractClusterIntegrationTest.java | 9 +-
.../ignite/internal/compute/ItComputeTest.java | 7 -
.../inmemory/ItRaftStorageVolatilenessTest.java | 209 +++++++++++++++++++++
.../sql/engine/exec/MockedStructuresTest.java | 4 +-
.../internal/storage/engine/TableStorage.java | 7 +
.../chm/TestConcurrentHashMapTableStorage.java | 5 +
.../pagememory/AbstractPageMemoryTableStorage.java | 60 +++++-
.../PersistentPageMemoryTableStorage.java | 5 +
.../pagememory/VolatilePageMemoryTableStorage.java | 5 +
.../storage/rocksdb/RocksDbTableStorage.java | 5 +
.../storage/rocksdb/RocksDbTableStorageTest.java | 5 +
.../distributed/ItInternalTableScanTest.java | 4 +-
.../distributed/ItTxDistributedTestSingleNode.java | 4 +-
.../internal/table/distributed/TableManager.java | 18 +-
.../table/distributed/TableManagerTest.java | 6 +-
parent/pom.xml | 13 ++
44 files changed, 777 insertions(+), 136 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 534566ad2..451ed3ea5 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.cluster.management.raft;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -90,7 +91,8 @@ public class ItCmgRaftServiceTest {
CompletableFuture<RaftGroupService> raftService = raftManager.prepareRaftGroup(
TEST_GROUP,
List.copyOf(clusterService.topologyService().allMembers()),
- () -> new CmgRaftGroupListener(raftStorage)
+ () -> new CmgRaftGroupListener(raftStorage),
+ defaults()
);
assertThat(raftService, willCompleteSuccessfully());
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index c26653f8b..522a0b9fd 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -480,11 +481,16 @@ public class ClusterManagementGroupManager implements IgniteComponent {
try {
return raftManager
- .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
- clusterStateStorage.start();
-
- return new CmgRaftGroupListener(clusterStateStorage);
- })
+ .prepareRaftGroup(
+ CMG_RAFT_GROUP_NAME,
+ nodes,
+ () -> {
+ clusterStateStorage.start();
+
+ return new CmgRaftGroupListener(clusterStateStorage);
+ },
+ RaftGroupOptions.defaults()
+ )
.thenApply(service -> new CmgRaftService(service, clusterService));
} catch (NodeStoppingException e) {
return failedFuture(e);
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index f7a1712c3..74b94bc7c 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.client;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -324,11 +325,11 @@ public class ItMetaStorageRaftGroupTest {
metaStorageRaftSrv3.start();
- metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+ metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
- metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+ metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
- metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+ metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
METASTORAGE_RAFT_GROUP_NAME,
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 95464010b..0139583d4 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -28,6 +28,7 @@ import static org.apache.ignite.internal.metastorage.client.ItMetaStorageService
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -1063,7 +1064,7 @@ public class ItMetaStorageServiceTest {
metaStorageRaftSrv.start();
- metaStorageRaftSrv.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+ metaStorageRaftSrv.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
metaStorageRaftGrpSvc = RaftGroupServiceImpl.start(
METASTORAGE_RAFT_GROUP_NAME,
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 39a65a356..c76cbe5c5 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
import org.apache.ignite.internal.metastorage.watch.KeyCriterion;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -177,7 +178,8 @@ public class MetaStorageManager implements IgniteComponent {
CompletableFuture<RaftGroupService> raftServiceFuture = raftMgr.prepareRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
metastorageNodes,
- () -> new MetaStorageListener(storage)
+ () -> new MetaStorageListener(storage),
+ RaftGroupOptions.defaults()
);
return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id()));
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 13012af8c..1e26ea10c 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.any;
@@ -66,7 +67,7 @@ public class ItLozaTest {
*/
private RaftGroupService startClient(String groupId, ClusterNode node, Loza loza) throws Exception {
return loza.prepareRaftGroup(groupId,
- List.of(node), () -> mock(RaftGroupListener.class)
+ List.of(node), () -> mock(RaftGroupListener.class), defaults()
).get(10, TimeUnit.SECONDS);
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index cb72190bc..735ce9a4b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -100,7 +101,8 @@ public class ItRaftGroupServiceTest {
CompletableFuture<RaftGroupService> raftGroupServiceFuture = raftSrvs.get(i).prepareRaftGroup(
RAFT_GROUP_NAME,
nodes,
- () -> mock(RaftGroupListener.class)
+ () -> mock(RaftGroupListener.class),
+ defaults()
);
svcFutures[i] = raftGroupServiceFuture;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
index 8066e5535..92038256d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.client.service;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -386,7 +387,8 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
server.startRaftGroup(
raftGroupId(),
createListener(service, listenerPersistencePath),
- INITIAL_CONF
+ INITIAL_CONF,
+ defaults()
);
return server;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index cd36eb2e1..fae72c1a5 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.raft.server;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
@@ -266,8 +267,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
private void startCluster() throws Exception {
for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
- raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
- raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+ raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+ raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
}
@@ -281,7 +282,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
@Test
public void testDisruptorThreadsCount() {
startServer(0, raftServer -> {
- raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
+ raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
Set<Thread> threads = getAllDisruptorCurrentThreads();
@@ -294,7 +295,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
servers.forEach(srv -> {
for (int i = 0; i < 10; i++) {
- srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF);
+ srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF, defaults());
}
});
@@ -673,9 +674,9 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
@Override public void run() {
String grp = "counter" + finalI;
- srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
- srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
- srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+ srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
+ srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
+ srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
}
}));
}
@@ -803,8 +804,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
}
var svc2 = startServer(stopIdx, r -> {
- r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
- r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+ r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+ r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
@@ -818,8 +819,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
svc2.stop();
var svc3 = startServer(stopIdx, r -> {
- r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
- r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+ r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+ r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 14f343def..46da74d31 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.server;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -106,8 +107,12 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest {
ClusterNode serverNode = server.clusterService().topologyService().localMember();
- assertTrue(server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address()))));
- assertTrue(server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address()))));
+ assertTrue(
+ server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())), defaults())
+ );
+ assertTrue(
+ server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())), defaults())
+ );
ClusterService clientNode1 = clusterService(PORT + 1, List.of(addr), true);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index d32e4d338..af3a5ed10 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -152,20 +153,22 @@ public class Loza implements IgniteComponent {
* @param groupId Raft group id.
* @param nodes Raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
+ * @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
* @throws NodeStoppingException If node stopping intention was detected.
*/
public CompletableFuture<RaftGroupService> prepareRaftGroup(
String groupId,
List<ClusterNode> nodes,
- Supplier<RaftGroupListener> lsnrSupplier
+ Supplier<RaftGroupListener> lsnrSupplier,
+ RaftGroupOptions groupOptions
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr);
+ return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr, groupOptions);
} finally {
busyLock.leaveBusy();
}
@@ -178,10 +181,16 @@ public class Loza implements IgniteComponent {
* @param nodes Raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+ * @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
*/
- private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String groupId, List<ClusterNode> nodes,
- Supplier<RaftGroupListener> lsnrSupplier, Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) {
+ private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
+ String groupId,
+ List<ClusterNode> nodes,
+ Supplier<RaftGroupListener> lsnrSupplier,
+ Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+ RaftGroupOptions groupOptions
+ ) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -193,7 +202,7 @@ public class Loza implements IgniteComponent {
if (hasLocalRaft) {
LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
- if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+ if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
locNodeName,
@@ -223,6 +232,7 @@ public class Loza implements IgniteComponent {
* @param deltaNodes New raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+ * @param groupOptions Options to apply to the group.
* @throws NodeStoppingException If node stopping intention was detected.
*/
public void startRaftGroupNode(
@@ -230,7 +240,9 @@ public class Loza implements IgniteComponent {
Collection<ClusterNode> nodes,
Collection<ClusterNode> deltaNodes,
Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) throws NodeStoppingException {
+ Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+ RaftGroupOptions groupOptions
+ ) throws NodeStoppingException {
assert !nodes.isEmpty();
if (!busyLock.enterBusy()) {
@@ -245,7 +257,7 @@ public class Loza implements IgniteComponent {
if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
- if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+ if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
locNodeName,
@@ -267,6 +279,7 @@ public class Loza implements IgniteComponent {
* @param deltaNodes New raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+ * @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
* @throws NodeStoppingException If node stopping intention was detected.
*/
@@ -275,14 +288,15 @@ public class Loza implements IgniteComponent {
Collection<ClusterNode> nodes,
Collection<ClusterNode> deltaNodes,
Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier
+ Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+ RaftGroupOptions groupOptions
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
try {
- return updateRaftGroupInternal(grpId, nodes, deltaNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier);
+ return updateRaftGroupInternal(grpId, nodes, deltaNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
} finally {
busyLock.leaveBusy();
}
@@ -296,6 +310,7 @@ public class Loza implements IgniteComponent {
* @param deltaNodes New raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
* @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+ * @param groupOptions Options to apply to the group.
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<RaftGroupService> updateRaftGroupInternal(
@@ -303,7 +318,9 @@ public class Loza implements IgniteComponent {
Collection<ClusterNode> nodes,
Collection<ClusterNode> deltaNodes,
Supplier<RaftGroupListener> lsnrSupplier,
- Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) {
+ Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+ RaftGroupOptions groupOptions
+ ) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -313,7 +330,7 @@ public class Loza implements IgniteComponent {
if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
- if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+ if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
throw new IgniteInternalException(IgniteStringFormatter.format(
"Raft group on the node is already started [node={}, raftGrp={}]",
locNodeName,
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
new file mode 100644
index 000000000..4880e3188
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.server;
+
+/**
+ * Options specific to a Raft group that is being started.
+ */
+public class RaftGroupOptions {
+ /** Whether volatile stores should be used for the corresponding Raft Group. Classic Raft uses persistent ones. */
+ private final boolean volatileStores;
+
+ /**
+ * Returns default options as defined by classic Raft (so stores are persistent).
+ *
+ * @return Default options.
+ */
+ public static RaftGroupOptions defaults() {
+ return forPersistentStores();
+ }
+
+ /**
+ * Returns options with persistent Raft stores.
+ *
+ * @return Options with persistent Raft stores.
+ */
+ public static RaftGroupOptions forPersistentStores() {
+ return new RaftGroupOptions(false);
+ }
+
+ /**
+ * Returns options with volatile Raft stores.
+ *
+ * @return Options with volatile Raft stores.
+ */
+ public static RaftGroupOptions forVolatileStores() {
+ return new RaftGroupOptions(true);
+ }
+
+ /**
+ * Creates options derived from table configuration.
+ *
+ * @param isVolatile Whether the table is configured as volatile (in-memory) or not.
+ * @return Options derived from table configuration.
+ */
+ public static RaftGroupOptions forTable(boolean isVolatile) {
+ return isVolatile ? forVolatileStores() : forPersistentStores();
+ }
+
+ private RaftGroupOptions(boolean volatileStores) {
+ this.volatileStores = volatileStores;
+ }
+
+ /**
+ * Returns {@code true} if the Raft group should store its metadata and logs in volatile storages, or {@code false}
+ * if these storages should be persistent.
+ *
+ * @return {@code true} if the Raft group should store its metadata and logs in volatile storages, or {@code false}
+ * if these storages should be persistent.
+ */
+ public boolean volatileStores() {
+ return volatileStores;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 3e795d73f..d57690e7a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -43,9 +43,10 @@ public interface RaftServer extends IgniteComponent {
* @param groupId Group id.
* @param lsnr The listener.
* @param initialConf Inititial group configuration.
+ * @param groupOptions Options to apply to the group.
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
- boolean startRaftGroup(String groupId, RaftGroupListener lsnr, List<Peer> initialConf);
+ boolean startRaftGroup(String groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
/**
* Starts a raft group bound to this cluster node.
@@ -54,10 +55,16 @@ public interface RaftServer extends IgniteComponent {
* @param evLsnr Listener for group membership and other events.
* @param lsnr Listener for state machine events.
* @param initialConf Inititial group configuration.
+ * @param groupOptions Options to apply to the group.
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
- boolean startRaftGroup(String groupId, RaftGroupEventsListener evLsnr,
- RaftGroupListener lsnr, List<Peer> initialConf);
+ boolean startRaftGroup(
+ String groupId,
+ RaftGroupEventsListener evLsnr,
+ RaftGroupListener lsnr,
+ List<Peer> initialConf,
+ RaftGroupOptions groupOptions
+ );
/**
* Synchronously stops a raft group if any.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index d15693164..438f96729 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
@@ -57,6 +58,7 @@ import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.core.VolatileJRaftServiceFactory;
import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -71,7 +73,6 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -316,14 +317,24 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr, @Nullable List<Peer> initialConf) {
- return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf);
+ public synchronized boolean startRaftGroup(
+ String groupId,
+ RaftGroupListener lsnr,
+ @Nullable List<Peer> initialConf,
+ RaftGroupOptions groupOptions
+ ) {
+ return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf, groupOptions);
}
/** {@inheritDoc} */
@Override
- public synchronized boolean startRaftGroup(String groupId, @NotNull RaftGroupEventsListener evLsnr,
- RaftGroupListener lsnr, @Nullable List<Peer> initialConf) {
+ public synchronized boolean startRaftGroup(
+ String groupId,
+ RaftGroupEventsListener evLsnr,
+ RaftGroupListener lsnr,
+ @Nullable List<Peer> initialConf,
+ RaftGroupOptions groupOptions
+ ) {
if (groups.containsKey(groupId)) {
return false;
}
@@ -331,6 +342,7 @@ public class JraftServerImpl implements RaftServer {
// Thread pools are shared by all raft groups.
NodeOptions nodeOptions = opts.copy();
+ // TODO: IGNITE-17083 - Do not create paths for volatile stores at all when we get rid of snapshot storage on FS.
Path serverDataPath = getServerDataPath(groupId);
try {
@@ -339,13 +351,19 @@ public class JraftServerImpl implements RaftServer {
throw new IgniteInternalException(e);
}
- nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
+ if (!groupOptions.volatileStores()) {
+ nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
+ }
nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
nodeOptions.setRaftGrpEvtsLsnr(evLsnr);
+ if (groupOptions.volatileStores()) {
+ nodeOptions.setServiceFactory(new VolatileJRaftServiceFactory());
+ }
+
if (initialConf != null) {
List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
index 2ebf6c260..5ec6b4efc 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
@@ -17,11 +17,13 @@
package org.apache.ignite.raft.jraft;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
/**
@@ -60,12 +62,16 @@ public interface JRaftServiceFactory {
*
* @return a codec factory to create encoder/decoder for raft log entry.
*/
- LogEntryCodecFactory createLogEntryCodecFactory();
+ default LogEntryCodecFactory createLogEntryCodecFactory() {
+ return LogEntryV1CodecFactory.getInstance();
+ }
/**
* Creates raft timer factory.
*
* @return The factory.
*/
- RaftTimerFactory createRaftTimerFactory();
+ default RaftTimerFactory createRaftTimerFactory() {
+ return new DefaultRaftTimerFactory();
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
index 39aa02b70..c6c71bcf5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
@@ -17,8 +17,6 @@
package org.apache.ignite.raft.jraft.core;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
-import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.storage.LogStorageFactory;
@@ -28,8 +26,6 @@ import org.apache.ignite.raft.jraft.storage.impl.LocalRaftMetaStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StringUtils;
-import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
/**
* The default factory for JRaft services.
@@ -57,12 +53,4 @@ public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
Requires.requireTrue(!StringUtils.isBlank(uri), "Blank raft meta storage uri.");
return new LocalRaftMetaStorage(uri, raftOptions);
}
-
- @Override public LogEntryCodecFactory createLogEntryCodecFactory() {
- return LogEntryV1CodecFactory.getInstance();
- }
-
- @Override public RaftTimerFactory createRaftTimerFactory() {
- return new DefaultRaftTimerFactory();
- }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
similarity index 51%
copy from modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
index 39aa02b70..6eb22616e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
@@ -17,52 +17,38 @@
package org.apache.ignite.raft.jraft.core;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
-import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.raft.jraft.storage.LogStorageFactory;
import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
-import org.apache.ignite.raft.jraft.storage.impl.LocalRaftMetaStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.StringUtils;
-import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
/**
- * The default factory for JRaft services.
+ * The factory for JRaft services producing volatile stores. Useful for Raft groups hosting partitions of in-memory tables.
*/
-public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
-
- private final LogStorageFactory logStorageFactory;
-
- public DefaultJRaftServiceFactory(LogStorageFactory factory) {
- logStorageFactory = factory;
- }
-
- @Override public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
+public class VolatileJRaftServiceFactory implements JRaftServiceFactory {
+ @Override
+ public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
Requires.requireTrue(StringUtils.isNotBlank(groupId), "Blank group id.");
- return logStorageFactory.getLogStorage(groupId, raftOptions);
+ return new LocalLogStorage(raftOptions);
}
- @Override public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
+ @Override
+ public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
Requires.requireTrue(!StringUtils.isBlank(uri), "Blank snapshot storage uri.");
- return new LocalSnapshotStorage(uri, raftOptions);
- }
- @Override public RaftMetaStorage createRaftMetaStorage(final String uri, final RaftOptions raftOptions) {
- Requires.requireTrue(!StringUtils.isBlank(uri), "Blank raft meta storage uri.");
- return new LocalRaftMetaStorage(uri, raftOptions);
- }
+ // TODO: IGNITE-17083 - return an in-memory store here (or get rid of SnapshotStorage)
- @Override public LogEntryCodecFactory createLogEntryCodecFactory() {
- return LogEntryV1CodecFactory.getInstance();
+ return new LocalSnapshotStorage(uri, raftOptions);
}
- @Override public RaftTimerFactory createRaftTimerFactory() {
- return new DefaultRaftTimerFactory();
+ @Override
+ public RaftMetaStorage createRaftMetaStorage(final String uri, final RaftOptions raftOptions) {
+ return new VolatileRaftMetaStorage();
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
similarity index 68%
copy from modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
index f689eb94c..215742468 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
@@ -14,14 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.storage.impl;
-import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
+package org.apache.ignite.raft.jraft.storage;
-public class LocalLogStorageTest extends BaseLogStorageTest {
- @Override
- protected LogStorage newLogStorage() {
- return new LocalLogStorage(this.path.toString(), new RaftOptions());
- }
+/**
+ * Marker interface for a storage that is volatile (i.e. it keeps its data in memory).
+ */
+public interface VolatileStorage {
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
index 7531b8718..1695036b0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
@@ -32,13 +32,14 @@ import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.Requires;
/**
* Stores log in heap.
*/
-public class LocalLogStorage implements LogStorage, Describer {
+public class LocalLogStorage implements LogStorage, Describer, VolatileStorage {
private static final IgniteLogger LOG = IgniteLogger.forClass(LocalLogStorage.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -56,7 +57,7 @@ public class LocalLogStorage implements LogStorage, Describer {
private volatile boolean initialized = false;
- public LocalLogStorage(final String path, final RaftOptions raftOptions) {
+ public LocalLogStorage(final RaftOptions raftOptions) {
super();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java
new file mode 100644
index 000000000..90564fc9f
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
+import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+
+/**
+ * Volatile (in-memory) implementation of {@link RaftMetaStorage}. Used for Raft groups storing partition data of
+ * volatile storages.
+ */
+public class VolatileRaftMetaStorage implements RaftMetaStorage, VolatileStorage {
+ private volatile long term;
+
+ private volatile PeerId votedFor = PeerId.emptyPeer();
+
+ @Override
+ public boolean init(RaftMetaStorageOptions opts) {
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // no-op
+ }
+
+ @Override
+ public boolean setTerm(long term) {
+ this.term = term;
+
+ return true;
+ }
+
+ @Override
+ public long getTerm() {
+ return term;
+ }
+
+ @Override
+ public boolean setVotedFor(PeerId peerId) {
+ this.votedFor = peerId;
+
+ return true;
+ }
+
+ @Override
+ public PeerId getVotedFor() {
+ return votedFor;
+ }
+
+ @Override
+ public boolean setTermAndVotedFor(long term, PeerId peerId) {
+ this.term = term;
+ this.votedFor = peerId;
+
+ return true;
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 26857cafd..b13450a3c 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.raft;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.List;
@@ -77,8 +78,14 @@ public class LozaTest extends IgniteAbstractTest {
Supplier<RaftGroupListener> lsnrSupplier = () -> null;
- assertThrows(NodeStoppingException.class, () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier, () -> null));
+ assertThrows(
+ NodeStoppingException.class,
+ () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier, () -> null, defaults())
+ );
assertThrows(NodeStoppingException.class, () -> loza.stopRaftGroup(raftGroupId));
- assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier));
+ assertThrows(
+ NodeStoppingException.class,
+ () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier, defaults())
+ );
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 296fb0157..a6b92248e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteStringFormatter;
@@ -173,7 +174,7 @@ public class RaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr,
- List<Peer> initialConf) {
+ List<Peer> initialConf, RaftGroupOptions groupOptions) {
if (listeners.containsKey(groupId)) {
return false;
}
@@ -185,8 +186,14 @@ public class RaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public boolean startRaftGroup(String groupId, RaftGroupEventsListener evLsnr, RaftGroupListener lsnr, List<Peer> initialConf) {
- return startRaftGroup(groupId, lsnr, initialConf);
+ public boolean startRaftGroup(
+ String groupId,
+ RaftGroupEventsListener evLsnr,
+ RaftGroupListener lsnr,
+ List<Peer> initialConf,
+ RaftGroupOptions groupOptions
+ ) {
+ return startRaftGroup(groupId, lsnr, initialConf, groupOptions);
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
index 96ab9938a..4766cd419 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
@@ -27,7 +27,7 @@ public class TestJRaftServiceFactory extends DefaultJRaftServiceFactory {
@Override
public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
- return new LocalLogStorage(null, raftOptions);
+ return new LocalLogStorage(raftOptions);
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
similarity index 55%
copy from modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
copy to modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
index 96ab9938a..da74083e8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
@@ -14,20 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.ignite.raft.jraft.core;
import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.junit.jupiter.api.Test;
-public class TestJRaftServiceFactory extends DefaultJRaftServiceFactory {
- public TestJRaftServiceFactory() {
- super(null);
- }
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
- @Override
- public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
- return new LocalLogStorage(null, raftOptions);
+class VolatileJRaftServiceFactoryTest {
+ private final VolatileJRaftServiceFactory serviceFactory = new VolatileJRaftServiceFactory();
+
+ @Test
+ void producesVolatileMetaStorage() {
+ assertThat(serviceFactory.createRaftMetaStorage("test", new RaftOptions()), is(instanceOf(VolatileStorage.class)));
}
-}
+ @Test
+ void producesVolatileLogStorage() {
+ assertThat(serviceFactory.createLogStorage("test", new RaftOptions()), is(instanceOf(VolatileStorage.class)));
+ }
+}
\ No newline at end of file
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
index f689eb94c..e183a982e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
@@ -22,6 +22,6 @@ import org.apache.ignite.raft.jraft.storage.LogStorage;
public class LocalLogStorageTest extends BaseLogStorageTest {
@Override
protected LogStorage newLogStorage() {
- return new LocalLogStorage(this.path.toString(), new RaftOptions());
+ return new LocalLogStorage(new RaftOptions());
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 0ab6d0ba7..022fbabdd 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -107,7 +107,7 @@ public class LogManagerTest extends BaseStorageTest {
}
protected LogStorage newLogStorage(final RaftOptions raftOptions) {
- return new LocalLogStorage(this.path.toString(), raftOptions);
+ return new LocalLogStorage(raftOptions);
}
@AfterEach
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java
new file mode 100644
index 000000000..208818a22
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class VolatileRaftMetaStorageTest {
+ private final VolatileRaftMetaStorage storage = new VolatileRaftMetaStorage();
+
+ private final PeerId peerId = new PeerId("1.2.3.4", 4000);
+
+ @Test
+ void initReturnsTrue() {
+ assertTrue(storage.init(new RaftMetaStorageOptions()));
+ }
+
+ @Test
+ void shutdownDoesNothing() {
+ assertDoesNotThrow(storage::shutdown);
+ }
+
+ @Test
+ void returnsDefaultValuesWhenNothingSet() {
+ assertThat(storage.getTerm(), is(0L));
+ assertThat(storage.getVotedFor(), is(PeerId.emptyPeer()));
+ }
+
+ @Test
+ void setTermReturnsTrue() {
+ assertTrue(storage.setTerm(42));
+ }
+
+ @Test
+ void setsAndReturnsTerm() {
+ storage.setTerm(3);
+
+ assertThat(storage.getTerm(), is(3L));
+ }
+
+ @Test
+ void setVotedForReturnsTrue() {
+ assertTrue(storage.setVotedFor(peerId));
+ }
+
+ @Test
+ void setsAndReturnsPeer() {
+ storage.setVotedFor(peerId);
+
+ assertThat(storage.getVotedFor(), is(sameInstance(peerId)));
+ }
+
+ @Test
+ void setsTermAndVotedForTogether() {
+ storage.setTermAndVotedFor(4, peerId);
+
+ assertThat(storage.getTerm(), is(4L));
+ assertThat(storage.getVotedFor(), is(sameInstance(peerId)));
+ }
+
+ @Test
+ void isInstanceOfVolatileStorage() {
+ assertThat(storage, is(instanceOf(VolatileStorage.class)));
+ }
+}
\ No newline at end of file
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index f591a2e05..7ea282796 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -132,6 +132,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>ca.seinesoftware</groupId>
+ <artifactId>hamcrest-path</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 256f9e5b1..675d57556 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,7 +69,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
/** Work directory. */
@WorkDirectory
- private static Path WORK_DIR;
+ protected static Path WORK_DIR;
/**
* Before all.
@@ -153,4 +154,10 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
protected final IgniteImpl node(int index) {
return (IgniteImpl) clusterNodes.get(index);
}
+
+ protected List<List<Object>> executeSql(String sql, Object... args) {
+ return getAllFromCursor(
+ node(0).queryEngine().queryAsync("PUBLIC", sql, args).get(0).join()
+ );
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index a2dc8f327..2857a6938 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
@@ -279,12 +278,6 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
assertThat(actualNodeName, in(allNodeNames()));
}
- private List<List<Object>> executeSql(String sql, Object... args) {
- return getAllFromCursor(
- node(0).queryEngine().queryAsync("PUBLIC", sql, args).get(0).join()
- );
- }
-
private static class ConcatJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java
new file mode 100644
index 000000000..e187435e6
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.inmemory;
+
+import static ca.seinesoftware.hamcrest.path.PathMatcher.exists;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * Tests for making sure that RAFT groups corresponding to partition stores of in-memory tables use volatile
+ * storages for storing RAFT meta and RAFT log, while they are persistent for persistent storages.
+ */
+class ItRaftStorageVolatilenessTest extends AbstractClusterIntegrationTest {
+ private static final String TABLE_NAME = "test";
+
+ @Override
+ protected int nodes() {
+ return 1;
+ }
+
+ @Test
+ void raftMetaStorageIsVolatileForVolatilePartitions() {
+ createInMemoryTable();
+
+ IgniteImpl ignite = node(0);
+
+ assertThat(partitionRaftMetaPaths(ignite), everyItem(not(exists())));
+ }
+
+ private void createInMemoryTable() {
+ executeSql("CREATE TABLE " + TABLE_NAME + " (k int, v int, CONSTRAINT PK PRIMARY KEY (k)) ENGINE aimem");
+ }
+
+ /**
+ * Returns paths for 'meta' directories corresponding to Raft meta storages for partitions of the test table.
+ *
+ * @param ignite Ignite instance.
+ * @return Paths for 'meta' directories corresponding to Raft meta storages for partitions of the test table.
+ */
+ private List<Path> partitionRaftMetaPaths(IgniteImpl ignite) {
+ try (Stream<Path> paths = Files.list(WORK_DIR.resolve(ignite.name()))) {
+ return paths
+ .filter(path -> isPartitionDir(path, ignite))
+ .map(path -> path.resolve("meta"))
+ .collect(toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isPartitionDir(Path path, IgniteImpl ignite) {
+ return path.getFileName().toString().startsWith(testTablePartitionPrefix(ignite));
+ }
+
+ private String testTablePartitionPrefix(IgniteImpl ignite) {
+ return testTableId(ignite) + "_part_";
+ }
+
+ private UUID testTableId(IgniteImpl ignite) {
+ TableManager tables = (TableManager) ignite.tables();
+ return tables.tableImpl("PUBLIC." + TABLE_NAME).tableId();
+ }
+
+ @Test
+ void raftLogStorageIsVolatileForVolatilePartitions() throws Exception {
+ createInMemoryTable();
+
+ IgniteImpl ignite = node(0);
+ String nodeName = ignite.name();
+ String tablePartitionPrefix = testTablePartitionPrefix(ignite);
+
+ node(0).close();
+
+ Path logRocksDbDir = WORK_DIR.resolve(nodeName).resolve("log");
+
+ List<ColumnFamilyDescriptor> cfDescriptors = List.of(
+ // Column family to store configuration log entry.
+ new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
+ // Default column family to store user data log entry.
+ new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
+ );
+
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+ try (RocksDB db = RocksDB.open(logRocksDbDir.toString(), cfDescriptors, cfHandles)) {
+ assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix, cfHandles.get(0));
+ assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix, cfHandles.get(1));
+ }
+ }
+
+ private void assertThatFamilyHasNoDataForPartition(RocksDB db, String tablePartitionPrefix, ColumnFamilyHandle cfHandle) {
+ try (
+ ReadOptions readOptions = new ReadOptions().setIterateLowerBound(new Slice(tablePartitionPrefix.getBytes(UTF_8)));
+ RocksIterator iterator = db.newIterator(cfHandle, readOptions)
+ ) {
+ iterator.seekToFirst();
+
+ if (iterator.isValid()) {
+ String key = new String(iterator.key(), UTF_8);
+ assertThat(key, not(startsWith(tablePartitionPrefix)));
+ }
+ }
+ }
+
+ @Test
+ void raftMetaStorageIsPersistentForPersistentPartitions() {
+ createPersistentTable();
+
+ IgniteImpl ignite = node(0);
+
+ assertThat(partitionRaftMetaPaths(ignite), everyItem(exists()));
+ }
+
+ private void createPersistentTable() {
+ executeSql("CREATE TABLE " + TABLE_NAME + " (k int, v int, CONSTRAINT PK PRIMARY KEY (k)) ENGINE rocksdb");
+ }
+
+ @Test
+ void raftLogStorageIsPersistentForPersistentPartitions() throws Exception {
+ createPersistentTable();
+
+ IgniteImpl ignite = node(0);
+ String nodeName = ignite.name();
+ String tablePartitionPrefix = testTablePartitionPrefix(ignite);
+
+ node(0).close();
+
+ Path logRocksDbDir = WORK_DIR.resolve(nodeName).resolve("log");
+
+ List<ColumnFamilyDescriptor> cfDescriptors = List.of(
+ // Column family to store configuration log entry.
+ new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
+ // Default column family to store user data log entry.
+ new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
+ );
+
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+ try (RocksDB db = RocksDB.open(logRocksDbDir.toString(), cfDescriptors, cfHandles)) {
+ assertThatFamilyHasDataForPartition(db, tablePartitionPrefix, cfHandles.get(0));
+ assertThatFamilyHasDataForPartition(db, tablePartitionPrefix, cfHandles.get(1));
+ }
+ }
+
+ private void assertThatFamilyHasDataForPartition(RocksDB db, String tablePartitionPrefix, ColumnFamilyHandle cfHandle) {
+ try (
+ ReadOptions readOptions = new ReadOptions().setIterateLowerBound(new Slice(tablePartitionPrefix.getBytes(UTF_8)));
+ RocksIterator iterator = db.newIterator(cfHandle, readOptions)
+ ) {
+ iterator.seekToFirst();
+
+ assertThat(iterator.isValid(), is(true));
+
+ String key = new String(iterator.key(), UTF_8);
+ assertThat(key, startsWith(tablePartitionPrefix));
+ }
+ }
+
+ @Test
+ void inMemoryTableWorks() {
+ createInMemoryTable();
+
+ executeSql("INSERT INTO " + TABLE_NAME + "(k, v) VALUES (1, 101)");
+
+ List<List<Object>> tuples = executeSql("SELECT k, v FROM " + TABLE_NAME);
+
+ assertThat(tuples, equalTo(List.of(List.of(1, 101))));
+ }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 5bf3f7485..7c43644c0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -628,7 +628,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
* @return Table manager.
*/
private TableManager mockManagers() throws NodeStoppingException {
- when(rm.prepareRaftGroup(any(), any(), any())).thenAnswer(mock -> {
+ when(rm.prepareRaftGroup(any(), any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
@@ -636,7 +636,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
return completedFuture(raftGrpSrvcMock);
});
- when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock -> {
+ when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index cb6b9dd37..7ab3ec975 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -78,6 +78,13 @@ public interface TableStorage {
*/
void dropIndex(String indexName);
+ /**
+ * Returns {@code true} if this storage is volatile (i.e. stores its data in memory), or {@code false} if it's persistent.
+ *
+ * @return {@code true} if this storage is volatile (i.e. stores its data in memory), or {@code false} if it's persistent.
+ */
+ boolean isVolatile();
+
/**
* Returns the table configuration.
*/
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
index d2c4eecbd..42c8705dd 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
@@ -85,6 +85,11 @@ public class TestConcurrentHashMapTableStorage implements TableStorage {
throw new UnsupportedOperationException("Not supported yet");
}
+ @Override
+ public boolean isVolatile() {
+ return true;
+ }
+
/** {@inheritDoc} */
@Override
public TableConfiguration configuration() {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 6dff5ce59..058421613 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.storage.pagememory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvPartitionStorage;
@@ -39,7 +42,7 @@ import org.jetbrains.annotations.TestOnly;
* Abstract table storage implementation based on {@link PageMemory}.
*/
// TODO: IGNITE-16642 Support indexes.
-public abstract class AbstractPageMemoryTableStorage implements TableStorage {
+public abstract class AbstractPageMemoryTableStorage implements TableStorage, MvTableStorage {
protected final TableConfiguration tableCfg;
/** List of objects to be closed on the {@link #stop}. */
@@ -49,6 +52,8 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
protected volatile AtomicReferenceArray<PartitionStorage> partitions;
+ protected volatile AtomicReferenceArray<MvPartitionStorage> mvPartitions;
+
/**
* Constructor.
*
@@ -71,6 +76,8 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
partitions = new AtomicReferenceArray<>(tableView.partitions());
+ mvPartitions = new AtomicReferenceArray<>(tableView.partitions());
+
started = true;
}
@@ -147,6 +154,57 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
*/
protected abstract VolatilePageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException;
+ /** {@inheritDoc} */
+ @Override
+ public MvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
+ MvPartitionStorage partition = getMvPartition(partitionId);
+
+ if (partition != null) {
+ return partition;
+ }
+
+ partition = createMvPartitionStorage(partitionId);
+
+ mvPartitions.set(partitionId, partition);
+
+ return partition;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MvPartitionStorage getMvPartition(int partitionId) {
+ assert started : "Storage has not started yet";
+
+ if (partitionId < 0 || partitionId >= mvPartitions.length()) {
+ throw new IllegalArgumentException(S.toString(
+ "Unable to access partition with id outside of configured range",
+ "table", tableCfg.name().value(), false,
+ "partitionId", partitionId, false,
+ "partitions", mvPartitions.length(), false
+ ));
+ }
+
+ return mvPartitions.get(partitionId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> destroyPartition(int partitionId) throws StorageException {
+ assert started : "Storage has not started yet";
+
+ MvPartitionStorage partition = getMvPartition(partitionId);
+
+ if (partition != null) {
+ mvPartitions.set(partitionId, null);
+
+ // TODO: IGNITE-17197 Actually destroy the partition.
+ //partition.destroy();
+ }
+
+ // TODO: IGNITE-17197 Convert this to true async code.
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* This API is not yet ready. But we need to test mv storages anyways.
*/
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 4f5b6e70a..3d9e57d47 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -56,6 +56,11 @@ class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
this.dataRegion = dataRegion;
}
+ @Override
+ public boolean isVolatile() {
+ return false;
+ }
+
/** {@inheritDoc} */
@Override
public void start() throws StorageException {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 503c5e6da..9cffb7c37 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -60,6 +60,11 @@ class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStorage {
);
}
+ @Override
+ public boolean isVolatile() {
+ return true;
+ }
+
/** {@inheritDoc} */
@Override
public void destroy() throws StorageException {
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 8315fb3a4..8a94317ca 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -312,6 +312,11 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
});
}
+ @Override
+ public boolean isVolatile() {
+ return false;
+ }
+
/**
* Checks that a passed partition id is within the proper bounds.
*
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index 49362bda4..23c86dec5 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -298,4 +298,9 @@ public class RocksDbTableStorageTest {
assertThat(partitionStorage1.read(overwriteData), is(testData));
assertThat(partitionStorage2.read(overwriteData), is(testData));
}
+
+ @Test
+ void storageAdvertisesItIsPersistent() {
+ assertThat(storage.isVolatile(), is(false));
+ }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 70be355f2..b9d0fd0eb 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -160,7 +161,8 @@ public class ItInternalTableScanTest {
raftSrv.startRaftGroup(
grpName,
new PartitionListener(tblId, new VersionedRowStore(mockStorage, txManager)),
- conf
+ conf,
+ RaftGroupOptions.defaults()
);
executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME));
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index d5e46ab37..84bd2f393 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.storage.basic.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.engine.TableStorage;
@@ -275,7 +276,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
grpId,
partNodes,
() -> new PartitionListener(tblId,
- new VersionedRowStore(new TestMvPartitionStorage(List.of(), 0), txManagers.get(node)))
+ new VersionedRowStore(new TestMvPartitionStorage(List.of(), 0), txManagers.get(node))),
+ RaftGroupOptions.defaults()
);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ff07d7806..23f99d201 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
@@ -480,7 +481,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partId,
busyLock,
movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler)
+ rebalanceScheduler),
+ groupOptionsForInternalTable(internalTbl)
).thenAccept(
updatedRaftGroupService -> ((InternalTableImpl) internalTbl)
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
@@ -500,6 +502,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture.allOf(futures).join();
}
+ private RaftGroupOptions groupOptionsForInternalTable(InternalTable internalTbl) {
+ return RaftGroupOptions.forTable(internalTbl.storage().isVolatile());
+ }
+
/** {@inheritDoc} */
@Override
public void stop() {
@@ -1291,8 +1297,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
+ "check if current node={} should start new raft group node for partition rebalance.",
pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
- raftMgr.startRaftGroupNode(partId, assignments, deltaPeers, raftGrpLsnrSupplier,
- raftGrpEvtsLsnrSupplier);
+ raftMgr.startRaftGroupNode(
+ partId,
+ assignments,
+ deltaPeers,
+ raftGrpLsnrSupplier,
+ raftGrpEvtsLsnrSupplier,
+ groupOptionsForInternalTable(tbl.internalTable())
+ );
} catch (NodeStoppingException e) {
// no-op
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 568e2e049..06cd02022 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -270,7 +270,7 @@ public class TableManagerTest extends IgniteAbstractTest {
*/
@Test
public void testPreconfiguredTable() throws Exception {
- when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock ->
+ when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock ->
CompletableFuture.completedFuture(mock(RaftGroupService.class)));
TableManager tableManager = createTableManager(tblManagerFut, false);
@@ -446,7 +446,7 @@ public class TableManagerTest extends IgniteAbstractTest {
mockManagersAndCreateTable(scmTbl, tblManagerFut);
- verify(rm, times(PARTITIONS)).updateRaftGroup(anyString(), any(), any(), any(), any());
+ verify(rm, times(PARTITIONS)).updateRaftGroup(anyString(), any(), any(), any(), any(), any());
TableManager tableManager = tblManagerFut.join();
@@ -555,7 +555,7 @@ public class TableManagerTest extends IgniteAbstractTest {
CompletableFuture<TableManager> tblManagerFut,
Phaser phaser
) throws Exception {
- when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock -> {
+ when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock -> {
RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
diff --git a/parent/pom.xml b/parent/pom.xml
index d69aea3a3..4da425afd 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -81,6 +81,7 @@
<typesafe.version>1.4.1</typesafe.version>
<hamcrest.version>2.2</hamcrest.version>
<hamcrest.optional.version>2.0.0</hamcrest.optional.version>
+ <hamcrest.path.version>1.0.1</hamcrest.path.version>
<scalecube.version>2.6.12</scalecube.version>
<calcite.version>1.30.0</calcite.version>
<immutables.version>2.8.8</immutables.version>
@@ -806,6 +807,18 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>ca.seinesoftware</groupId>
+ <artifactId>hamcrest-path</artifactId>
+ <version>${hamcrest.path.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-cluster</artifactId>