You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/24 10:28:41 UTC
[ignite-3] branch ignite-3.0.0-beta1 updated: ignite-17759 Need to pass commitTableId and commitPartitionId to MvPartitionStorage#addWrite (#1225)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
new 6e1d1d6a73 ignite-17759 Need to pass commitTableId and commitPartitionId to MvPartitionStorage#addWrite (#1225)
6e1d1d6a73 is described below
commit 6e1d1d6a73ec2dc24d69758cd2b6a0c7e196d4df
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Oct 24 13:19:30 2022 +0300
ignite-17759 Need to pass commitTableId and commitPartitionId to MvPartitionStorage#addWrite (#1225)
(cherry picked from commit f4d526eb831543aae1cd79bb5f3b948f3c80d7a1)
---
.../org/apache/ignite/client/fakes/FakeIgnite.java | 15 ++-
.../management/raft/ItCmgRaftServiceTest.java | 7 +-
.../management/ClusterManagementGroupManager.java | 10 +-
.../internal/cluster/management/CmgGroupId.java} | 28 +++-
.../internal/replicator/ReplicationGroupId.java} | 14 +-
.../org/apache/ignite/lang/IgniteFiveFunction.java | 51 +++++++
.../client/ItMetaStorageRaftGroupTest.java | 24 ++--
.../ItMetaStorageServicePersistenceTest.java | 4 +-
.../client/ItMetaStorageServiceTest.java | 11 +-
.../metastorage/common/MetastorageGroupId.java} | 28 +++-
.../internal/metastorage/MetaStorageManager.java | 8 +-
.../raft/client/service/RaftGroupService.java | 3 +-
.../apache/ignite/internal/raft/ItLozaTest.java | 39 +++++-
.../internal/raft/ItRaftGroupServiceTest.java | 37 ++++-
.../raft/server/ItJraftCounterServerTest.java | 34 ++---
.../ignite/raft/server/ItJraftHlcServerTest.java | 13 +-
.../raft/server/ItSimpleCounterServerTest.java | 4 +-
.../ignite/raft/server/RaftServerAbstractTest.java | 35 +++++
.../java/org/apache/ignite/internal/raft/Loza.java | 15 ++-
.../ignite/internal/raft/server/RaftServer.java | 11 +-
.../internal/raft/server/impl/JraftServerImpl.java | 31 +++--
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 16 ++-
.../org/apache/ignite/internal/raft/LozaTest.java | 37 ++++-
.../raft/jraft/core/RaftGroupServiceTest.java | 150 ++++++++++-----------
.../internal/raft/server/impl/RaftServerImpl.java | 20 ++-
.../service/ItAbstractListenerSnapshotTest.java | 41 +++++-
.../apache/ignite/internal/replicator/Replica.java | 4 +-
.../ignite/internal/replicator/ReplicaManager.java | 23 +++-
.../ReplicaIsAlreadyStartedException.java | 3 +-
.../exception/ReplicaUnavailableException.java | 3 +-
.../replicator/exception/ReplicationException.java | 5 +-
.../exception/ReplicationTimeoutException.java | 3 +-
.../replicator/message/ReplicaRequest.java | 5 +-
.../storage/ItRebalanceDistributedTest.java | 10 +-
.../app/ItIgniteInMemoryNodeRestartTest.java | 32 ++++-
.../ignite/distributed/ItTablePersistenceTest.java | 4 +-
.../distributed/ItTxDistributedTestSingleNode.java | 21 +--
.../ignite/internal/table/ItColocationTest.java | 25 +++-
.../internal/table/distributed/TableManager.java | 74 +++++-----
.../table/distributed/command/FinishTxCommand.java | 7 +-
.../distributed/command/UpdateAllCommand.java | 33 ++++-
.../table/distributed/command/UpdateCommand.java | 27 +++-
.../table/distributed/raft/PartitionListener.java | 11 +-
.../raft/RebalanceRaftGroupEventsListener.java | 5 +-
.../request/ReadWriteMultiRowReplicaRequest.java | 9 ++
.../request/ReadWriteSingleRowReplicaRequest.java | 9 ++
.../request/ReadWriteSwapRowReplicaRequest.java | 9 ++
.../replicator/PartitionReplicaListener.java | 92 +++++++------
.../distributed/replicator/PlacementDriver.java | 9 +-
.../distributed/replicator/TablePartitionId.java | 88 ++++++++++++
.../distributed/storage/InternalTableImpl.java | 80 +++++++----
.../ignite/internal/utils/RebalanceUtil.java | 25 ++--
.../ignite/internal/table/TxAbstractTest.java | 6 +-
.../apache/ignite/internal/table/TxLocalTest.java | 5 +-
.../table/distributed/TableManagerTest.java | 14 +-
.../PartitionRaftCommandsSerializationTest.java | 19 ++-
.../raft/PartitionCommandListenerTest.java | 19 ++-
.../replication/PartitionReplicaListenerTest.java | 5 +-
.../table/impl/DummyInternalTableImpl.java | 17 +--
.../ignite/internal/tx/InternalTransaction.java | 24 +++-
.../org/apache/ignite/internal/tx/TxManager.java | 7 +-
.../java/org/apache/ignite/internal/tx/TxMeta.java | 7 +-
.../ignite/internal/tx/impl/TransactionImpl.java | 39 ++++--
.../ignite/internal/tx/impl/TxManagerImpl.java | 8 +-
.../tx/message/TxFinishReplicaRequest.java | 3 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 43 +++++-
.../storage/state/TxStateStorageAbstractTest.java | 45 ++++++-
67 files changed, 1103 insertions(+), 460 deletions(-)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 039f49db25..fcaa85853f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -22,6 +22,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxState;
@@ -94,7 +95,7 @@ public class FakeIgnite implements Ignite {
}
@Override
- public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(String partGroupId) {
+ public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
return null;
}
@@ -103,9 +104,19 @@ public class FakeIgnite implements Ignite {
return null;
}
+ @Override
+ public boolean assignCommitPartition(ReplicationGroupId replicationGroupId) {
+ return false;
+ }
+
+ @Override
+ public ReplicationGroupId commitPartition() {
+ return null;
+ }
+
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(
- String replicationGroupId,
+ ReplicationGroupId replicationGroupId,
IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
return null;
}
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 2534582661..abe50a0997 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.cluster.management.raft;
import static org.apache.ignite.internal.cluster.management.ClusterState.clusterState;
import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
+import static org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -72,8 +73,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
public class ItCmgRaftServiceTest {
- private static final String TEST_GROUP = "test_group";
-
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
@@ -105,7 +104,7 @@ public class ItCmgRaftServiceTest {
raftStorage.start();
CompletableFuture<RaftGroupService> raftService = raftManager.prepareRaftGroup(
- TEST_GROUP,
+ INSTANCE,
List.copyOf(clusterService.topologyService().allMembers()),
() -> new CmgRaftGroupListener(raftStorage),
defaults()
@@ -121,7 +120,7 @@ public class ItCmgRaftServiceTest {
void beforeNodeStop() {
try {
- raftManager.stopRaftGroup(TEST_GROUP);
+ raftManager.stopRaftGroup(INSTANCE);
} catch (NodeStoppingException e) {
throw new RuntimeException(e);
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index dad388bc73..ee0eaba79b 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static org.apache.ignite.internal.cluster.management.ClusterTag.clusterTag;
+import static org.apache.ignite.internal.cluster.management.CmgGroupId.INSTANCE;
import static org.apache.ignite.network.util.ClusterServiceUtils.resolveNodes;
import java.util.Collection;
@@ -85,9 +86,6 @@ public class ClusterManagementGroupManager implements IgniteComponent {
private static final IgniteLogger LOG = Loggers.forClass(ClusterManagementGroupManager.class);
- /** CMG Raft group name. */
- private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -384,7 +382,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
raftService = null;
}
- raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+ raftManager.stopRaftGroup(INSTANCE);
if (clusterStateStorage.isStarted()) {
clusterStateStorage.destroy();
@@ -472,7 +470,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
try {
return raftManager
.prepareRaftGroup(
- CMG_RAFT_GROUP_NAME,
+ INSTANCE,
resolveNodes(clusterService, nodeNames),
() -> {
clusterStateStorage.start();
@@ -616,7 +614,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10, TimeUnit.SECONDS);
IgniteUtils.closeAll(
- () -> raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME),
+ () -> raftManager.stopRaftGroup(INSTANCE),
clusterStateStorage
);
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgGroupId.java
similarity index 59%
copy from modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
copy to modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgGroupId.java
index 482f6fea3e..4e2b6d3ab0 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/CmgGroupId.java
@@ -15,18 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator.message;
+package org.apache.ignite.internal.cluster.management;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
/**
- * Replica request.
+ * CMG replication group id.
*/
-public interface ReplicaRequest extends NetworkMessage {
+public enum CmgGroupId implements ReplicationGroupId {
+ /** CMG group id. */
+ INSTANCE("cmg_group");
+
+ /** Group id string representation. */
+ private String name;
+
/**
- * Gets a replication group id.
+ * The constructor.
*
- * @return Replication group id.
+ * @param name The string representation of the enum.
*/
- String groupId();
+ CmgGroupId(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
similarity index 73%
copy from modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
copy to modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
index 482f6fea3e..cc3fb4dcb5 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/replicator/ReplicationGroupId.java
@@ -15,18 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator.message;
+package org.apache.ignite.internal.replicator;
-import org.apache.ignite.network.NetworkMessage;
+import java.io.Serializable;
/**
- * Replica request.
+ * The interface represents a replication group identifier.
*/
-public interface ReplicaRequest extends NetworkMessage {
- /**
- * Gets a replication group id.
- *
- * @return Replication group id.
- */
- String groupId();
+public interface ReplicationGroupId extends Serializable {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
new file mode 100644
index 0000000000..2372f91e26
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteFiveFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.function.Function;
+
+/**
+ * Represents a function that accepts four arguments and produces a result.
+ * This is the five-arity specialization of {@link Function}.
+ *
+ * <p>This is a <a href="package-summary.html">functional interface</a>
+ * whose functional method is {@link #apply(Object, Object, Object, Object, Object)}.
+ *
+ * @param <T> the type of the first argument to the function
+ * @param <U> the type of the second argument to the function
+ * @param <V> the type of the third argument to the function
+ * @param <M> the type of the fourth argument to the function
+ * @param <N> the type of the fifth argument to the function
+ * @param <R> the type of the result of the function
+ *
+ * @see Function
+ */
+@FunctionalInterface
+public interface IgniteFiveFunction<T, U, V, M, N, R> {
+ /**
+ * Applies this function to the given arguments.
+ *
+ * @param t the first function argument
+ * @param u the second function argument
+ * @param v the third function argument
+ * @param m the fourth function argument
+ * @param n the fifth function argument
+ * @return the function result
+ */
+ R apply(T t, U u, V v, M m, N n);
+}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index 0284581c20..d68e124edb 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.client;
+import static org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -86,9 +87,6 @@ public class ItMetaStorageRaftGroupTest {
/** Nodes. */
private static final int NODES = 3;
- /** Meta Storage raft group name. */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
/** Factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
@@ -192,19 +190,19 @@ public class ItMetaStorageRaftGroupTest {
@AfterEach
public void afterTest() throws Exception {
if (metaStorageRaftSrv3 != null) {
- metaStorageRaftSrv3.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv3.stopRaftGroup(INSTANCE);
metaStorageRaftSrv3.stop();
metaStorageRaftGrpSvc3.shutdown();
}
if (metaStorageRaftSrv2 != null) {
- metaStorageRaftSrv2.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv2.stopRaftGroup(INSTANCE);
metaStorageRaftSrv2.stop();
metaStorageRaftGrpSvc2.shutdown();
}
if (metaStorageRaftSrv1 != null) {
- metaStorageRaftSrv1.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv1.stopRaftGroup(INSTANCE);
metaStorageRaftSrv1.stop();
metaStorageRaftGrpSvc1.shutdown();
}
@@ -276,7 +274,7 @@ public class ItMetaStorageRaftGroupTest {
() -> replicatorStartedCounter.get() == 2, 5_000), replicatorStartedCounter.get() + "");
//stop leader
- oldLeaderServer.get().stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ oldLeaderServer.get().stopRaftGroup(INSTANCE);
oldLeaderServer.get().stop();
cluster.stream().filter(c -> c.topologyService().localMember().address().equals(oldLeader)).findFirst().get().stop();
@@ -326,14 +324,14 @@ public class ItMetaStorageRaftGroupTest {
metaStorageRaftSrv3.start();
- metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
+ metaStorageRaftSrv1.startRaftGroup(INSTANCE, new MetaStorageListener(mockStorage), peers, defaults());
- metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
+ metaStorageRaftSrv2.startRaftGroup(INSTANCE, new MetaStorageListener(mockStorage), peers, defaults());
- metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
+ metaStorageRaftSrv3.startRaftGroup(INSTANCE, new MetaStorageListener(mockStorage), peers, defaults());
metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
cluster.get(0),
FACTORY,
10_000,
@@ -344,7 +342,7 @@ public class ItMetaStorageRaftGroupTest {
).get();
metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
cluster.get(1),
FACTORY,
10_000,
@@ -355,7 +353,7 @@ public class ItMetaStorageRaftGroupTest {
).get();
metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
cluster.get(2),
FACTORY,
10_000,
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
index ea78834b8b..7b1bb1cf88 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
@@ -138,8 +138,8 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps
/** {@inheritDoc} */
@Override
- public String raftGroupId() {
- return "metastorage";
+ public TestReplicationGroupId raftGroupId() {
+ return new TestReplicationGroupId("metastorage");
}
/**
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 7929a96bc5..a330eded5a 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -28,6 +28,7 @@ import static org.apache.ignite.internal.metastorage.client.ItMetaStorageService
import static org.apache.ignite.internal.metastorage.client.Operations.ops;
import static org.apache.ignite.internal.metastorage.client.Operations.put;
import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import static org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE;
import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -124,8 +125,6 @@ public class ItMetaStorageServiceTest {
/** Nodes. */
private static final int NODES = 2;
- private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
/** Factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
@@ -250,7 +249,7 @@ public class ItMetaStorageServiceTest {
*/
@AfterEach
public void afterTest() throws Exception {
- metaStorageRaftSrv.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME);
+ metaStorageRaftSrv.stopRaftGroup(INSTANCE);
metaStorageRaftSrv.stop();
metaStorageRaftGrpSvc.shutdown();
@@ -912,7 +911,7 @@ public class ItMetaStorageServiceTest {
List<Peer> peers = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
RaftGroupService metaStorageRaftGrpSvc = RaftGroupServiceImpl.start(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
cluster.get(1),
FACTORY,
10_000,
@@ -1065,10 +1064,10 @@ public class ItMetaStorageServiceTest {
metaStorageRaftSrv.start();
- metaStorageRaftSrv.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
+ metaStorageRaftSrv.startRaftGroup(INSTANCE, new MetaStorageListener(mockStorage), peers, defaults());
metaStorageRaftGrpSvc = RaftGroupServiceImpl.start(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
cluster.get(1),
FACTORY,
10_000,
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/MetastorageGroupId.java
similarity index 57%
copy from modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/MetastorageGroupId.java
index 482f6fea3e..9f61c54ab6 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/MetastorageGroupId.java
@@ -15,18 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.replicator.message;
+package org.apache.ignite.internal.metastorage.common;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
/**
- * Replica request.
+ * Meta storage replication group id.
*/
-public interface ReplicaRequest extends NetworkMessage {
+public enum MetastorageGroupId implements ReplicationGroupId {
+ /** Meta storage group id. */
+ INSTANCE("metastorage_group");
+
+ /** Group id string representation. */
+ private String name;
+
/**
- * Gets a replication group id.
+ * The constructor.
*
- * @return Replication group id.
+ * @param name The string representation of the enum.
*/
- String groupId();
+ MetastorageGroupId(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return name;
+ }
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index d661127984..aaba0e1a5b 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage;
+import static org.apache.ignite.internal.metastorage.common.MetastorageGroupId.INSTANCE;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.lang.ErrorGroups.MetaStorage.CURSOR_CLOSING_ERR;
@@ -79,9 +80,6 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
public class MetaStorageManager implements IgniteComponent {
- /** Meta storage raft group name. */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
-
/**
* Special key for the vault where the applied revision for {@link MetaStorageManager#storeEntries} operation is stored. This mechanism
* is needed for committing processed watches to {@link VaultManager}.
@@ -179,7 +177,7 @@ public class MetaStorageManager implements IgniteComponent {
try {
CompletableFuture<RaftGroupService> raftServiceFuture = raftMgr.prepareRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
+ INSTANCE,
metastorageNodes,
() -> new MetaStorageListener(storage),
RaftGroupOptions.defaults()
@@ -228,7 +226,7 @@ public class MetaStorageManager implements IgniteComponent {
synchronized (this) {
IgniteUtils.closeAll(
this::stopDeployedWatches,
- () -> raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME),
+ () -> raftMgr.stopRaftGroup(INSTANCE),
storage
);
}
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
index a30bcf0a59..d204d7dfd0 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupService.java
@@ -20,6 +20,7 @@ package org.apache.ignite.raft.client.service;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Command;
@@ -50,7 +51,7 @@ public interface RaftGroupService {
/**
* Returns group id.
*/
- @NotNull String groupId();
+ @NotNull ReplicationGroupId groupId();
/**
* Returns default timeout for the operations in milliseconds.
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index feea5f989d..a100881f07 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -39,6 +40,7 @@ import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterNode;
@@ -74,7 +76,7 @@ public class ItLozaTest {
*
* @return Raft group service.
*/
- private RaftGroupService startClient(String groupId, ClusterNode node, Loza loza) throws Exception {
+ private RaftGroupService startClient(TestReplicationGroupId groupId, ClusterNode node, Loza loza) throws Exception {
Supplier<RaftGroupListener> raftGroupListenerSupplier = () -> {
RaftGroupListener raftGroupListener = mock(RaftGroupListener.class);
@@ -141,7 +143,7 @@ public class ItLozaTest {
.doCallRealMethod()
.when(messagingServiceMock).invoke(any(NetworkAddress.class), any(), anyLong());
- grpSrvcs[i] = startClient(Integer.toString(i), service.topologyService().localMember(), loza);
+ grpSrvcs[i] = startClient(new TestReplicationGroupId(Integer.toString(i)), service.topologyService().localMember(), loza);
verify(messagingServiceMock, times(3 * (i + 1)))
.invoke(any(NetworkAddress.class), any(), anyLong());
@@ -162,4 +164,37 @@ public class ItLozaTest {
}
}
}
+
+ /**
+ * Test replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index fd2ee082e8..f17cb31bd3 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -29,12 +29,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterNode;
@@ -70,7 +72,7 @@ public class ItRaftGroupServiceTest {
private static final int NODE_PORT_BASE = 20_000;
- private static final String RAFT_GROUP_NAME = "part1";
+ private static final TestReplicationGroupId RAFT_GROUP_NAME = new TestReplicationGroupId("part1");
private static List<ClusterService> clusterServices = new ArrayList<>();
@@ -161,4 +163,37 @@ public class ItRaftGroupServiceTest {
return expectedNewLeaderPeer.equals(raftGroups.get(newLeaderNode).leader());
}, 10_000));
}
+
+ /**
+ * Test replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index dbcfd720ff..3a16b68619 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -103,12 +104,12 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
/**
* Counter group name 0.
*/
- private static final String COUNTER_GROUP_0 = "counter0";
+ private static final TestReplicationGroupId COUNTER_GROUP_0 = new TestReplicationGroupId("counter0");
/**
* Counter group name 1.
*/
- private static final String COUNTER_GROUP_1 = "counter1";
+ private static final TestReplicationGroupId COUNTER_GROUP_1 = new TestReplicationGroupId("counter1");
/**
* The server port offset.
@@ -203,9 +204,9 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
iterSrv.remove();
- Set<String> grps = server.startedGroups();
+ Set<ReplicationGroupId> grps = server.startedGroups();
- for (String grp : grps) {
+ for (ReplicationGroupId grp : grps) {
server.stopRaftGroup(grp);
}
@@ -263,7 +264,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
* @return The client.
* @throws Exception If failed.
*/
- private RaftGroupService startClient(String groupId) throws Exception {
+ private RaftGroupService startClient(TestReplicationGroupId groupId) throws Exception {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService clientNode = clusterService(CLIENT_PORT + clients.size(), List.of(addr), true);
@@ -301,7 +302,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
@Test
public void testDisruptorThreadsCount() {
startServer(0, raftServer -> {
- raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF, defaults());
+ raftServer.startRaftGroup(new TestReplicationGroupId("test_raft_group"), listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
Set<Thread> threads = getAllDisruptorCurrentThreads();
@@ -314,7 +315,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
servers.forEach(srv -> {
for (int i = 0; i < 10; i++) {
- srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF, defaults());
+ srv.startRaftGroup(new TestReplicationGroupId("test_raft_group_" + i), listenerFactory.get(), INITIAL_CONF, defaults());
}
});
@@ -329,10 +330,10 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
assertEquals(threadsBefore, threadsAfter, "Difference: " + threadNamesAfter);
servers.forEach(srv -> {
- srv.stopRaftGroup("test_raft_group");
+ srv.stopRaftGroup(new TestReplicationGroupId("test_raft_group"));
for (int i = 0; i < 10; i++) {
- srv.stopRaftGroup("test_raft_group_" + i);
+ srv.stopRaftGroup(new TestReplicationGroupId("test_raft_group_" + i));
}
});
}
@@ -695,7 +696,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
futs.add(svc.submit(new Runnable() {
@Override
public void run() {
- String grp = "counter" + finalI;
+ TestReplicationGroupId grp = new TestReplicationGroupId("counter" + finalI);
srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
@@ -716,7 +717,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
}
for (int i = 0; i < groupsCnt; i++) {
- String grp = "counter" + i;
+ TestReplicationGroupId grp = new TestReplicationGroupId("counter" + i);
assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
}
@@ -742,6 +743,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
HashMap<Integer, AtomicInteger> counters = new HashMap<>(3);
HashMap<Path, Integer> snapshotDataStorage = new HashMap<>(3);
HashMap<String, SnapshotMeta> snapshotMetaStorage = new HashMap<>(3);
+ TestReplicationGroupId grpId = new TestReplicationGroupId("test_raft_group");
for (int i = 0; i < 3; i++) {
AtomicInteger counter;
@@ -749,12 +751,12 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
counters.put(i, counter = new AtomicInteger());
startServer(i, raftServer -> {
- raftServer.startRaftGroup("test_raft_group", new UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+ raftServer.startRaftGroup(grpId, new UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
defaults().snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
}, opts -> {});
}
- var raftClient = startClient("test_raft_group");
+ var raftClient = startClient(grpId);
raftClient.refreshMembers(true).get();
var peers = raftClient.peers();
@@ -797,7 +799,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
startServer(i, raftServer -> {
counter.set(0);
- raftServer.startRaftGroup("test_raft_group", new UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
+ raftServer.startRaftGroup(grpId, new UpdateCountRaftListener(counter, snapshotDataStorage), INITIAL_CONF,
defaults().snapshotStorageFactory(new SnapshotInMemoryStorageFactory(snapshotMetaStorage)));
}, opts -> {});
}
@@ -839,7 +841,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
* @param grpId Group id.
* @return {@code True} if a leader is elected.
*/
- private boolean hasLeader(String grpId) {
+ private boolean hasLeader(TestReplicationGroupId grpId) {
return servers.stream().anyMatch(s -> {
NodeImpl node = (NodeImpl) s.raftGroupService(grpId).getRaftNode();
@@ -978,7 +980,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
* @param groupId Group id.
* @return Validation result.
*/
- private static boolean validateStateMachine(long expected, JraftServerImpl server, String groupId) {
+ private static boolean validateStateMachine(long expected, JraftServerImpl server, TestReplicationGroupId groupId) {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(groupId);
JraftServerImpl.DelegatingStateMachine fsm0 =
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
index e7a219e13c..2ae91d709d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterService;
@@ -108,9 +109,9 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
iterSrv.remove();
- Set<String> grps = server.startedGroups();
+ Set<ReplicationGroupId> grps = server.startedGroups();
- for (String grp : grps) {
+ for (ReplicationGroupId grp : grps) {
server.stopRaftGroup(grp);
}
@@ -170,12 +171,12 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
@Test
public void testHlcOneInstancePerIgniteNode() {
startServer(0, raftServer -> {
- raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF, defaults());
+ raftServer.startRaftGroup(new TestReplicationGroupId("test_raft_group"), listenerFactory.get(), INITIAL_CONF, defaults());
}, opts -> {});
servers.forEach(srv -> {
for (int i = 0; i < 5; i++) {
- srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF, defaults());
+ srv.startRaftGroup(new TestReplicationGroupId("test_raft_group_" + i), listenerFactory.get(), INITIAL_CONF, defaults());
}
});
@@ -199,10 +200,10 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest {
});
servers.forEach(srv -> {
- srv.stopRaftGroup("test_raft_group");
+ srv.stopRaftGroup(new TestReplicationGroupId("test_raft_group"));
for (int i = 0; i < 10; i++) {
- srv.stopRaftGroup("test_raft_group_" + i);
+ srv.stopRaftGroup(new TestReplicationGroupId("test_raft_group_" + i));
}
});
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 07ee77475a..b42a0aa9cf 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -63,12 +63,12 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest {
/**
* Counter raft group 0.
*/
- private static final String COUNTER_GROUP_ID_0 = "counter0";
+ private static final TestReplicationGroupId COUNTER_GROUP_ID_0 = new TestReplicationGroupId("counter0");
/**
* Counter raft group 1.
*/
- private static final String COUNTER_GROUP_ID_1 = "counter1";
+ private static final TestReplicationGroupId COUNTER_GROUP_ID_1 = new TestReplicationGroupId("counter1");
/**
* The client 1.
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index 7f32b9805d..100abc8340 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -19,8 +19,10 @@ package org.apache.ignite.raft.server;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
@@ -80,4 +82,37 @@ abstract class RaftServerAbstractTest {
return network;
}
+
+ /**
+ * Test replication group id.
+ */
+ protected static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 572d39d301..387b8008fa 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -165,7 +166,7 @@ public class Loza implements IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
public CompletableFuture<RaftGroupService> prepareRaftGroup(
- String groupId,
+ ReplicationGroupId groupId,
List<ClusterNode> nodes,
Supplier<RaftGroupListener> lsnrSupplier,
RaftGroupOptions groupOptions
@@ -186,7 +187,7 @@ public class Loza implements IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
public CompletableFuture<RaftGroupService> prepareRaftGroup(
- String groupId,
+ ReplicationGroupId groupId,
List<ClusterNode> nodes,
Supplier<RaftGroupListener> lsnrSupplier,
Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
@@ -214,7 +215,7 @@ public class Loza implements IgniteComponent {
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
- String groupId,
+ ReplicationGroupId groupId,
List<ClusterNode> nodes,
Supplier<RaftGroupListener> lsnrSupplier,
Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
@@ -261,7 +262,7 @@ public class Loza implements IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
public void startRaftGroupNode(
- String grpId,
+ ReplicationGroupId grpId,
Collection<ClusterNode> nodes,
RaftGroupListener lsnr,
RaftGroupEventsListener raftGrpEvtsLsnr,
@@ -298,7 +299,7 @@ public class Loza implements IgniteComponent {
* @throws NodeStoppingException If node stopping intention was detected.
*/
public CompletableFuture<RaftGroupService> startRaftGroupService(
- String grpId,
+ ReplicationGroupId grpId,
Collection<ClusterNode> nodes
) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
@@ -330,7 +331,7 @@ public class Loza implements IgniteComponent {
* @param groupId Raft group id.
* @throws NodeStoppingException If node stopping intention was detected.
*/
- public void stopRaftGroup(String groupId) throws NodeStoppingException {
+ public void stopRaftGroup(ReplicationGroupId groupId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
@@ -392,7 +393,7 @@ public class Loza implements IgniteComponent {
* @return Started groups.
*/
@TestOnly
- public Set<String> startedGroups() {
+ public Set<ReplicationGroupId> startedGroups() {
return raftServer.startedGroups();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 41ebcf78ca..8cade1e588 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.server;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupListener;
@@ -46,7 +47,7 @@ public interface RaftServer extends IgniteComponent {
* @param groupOptions Options to apply to the group.
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
- boolean startRaftGroup(String groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
+ boolean startRaftGroup(ReplicationGroupId groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
/**
* Starts a raft group bound to this cluster node.
@@ -59,7 +60,7 @@ public interface RaftServer extends IgniteComponent {
* @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
*/
boolean startRaftGroup(
- String groupId,
+ ReplicationGroupId groupId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
List<Peer> initialConf,
@@ -72,7 +73,7 @@ public interface RaftServer extends IgniteComponent {
* @param groupId Group id.
* @return {@code True} if a group was successfully stopped.
*/
- boolean stopRaftGroup(String groupId);
+ boolean stopRaftGroup(ReplicationGroupId groupId);
/**
* Returns a local peer.
@@ -80,7 +81,7 @@ public interface RaftServer extends IgniteComponent {
* @param groupId Group id.
* @return Local peer or null if the group is not started.
*/
- @Nullable Peer localPeer(String groupId);
+ @Nullable Peer localPeer(ReplicationGroupId groupId);
/**
* Returns a set of started partition groups.
@@ -88,5 +89,5 @@ public interface RaftServer extends IgniteComponent {
* @return Started groups.
*/
@TestOnly
- Set<String> startedGroups();
+ Set<ReplicationGroupId> startedGroups();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index d4450a47a7..f685423809 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
@@ -95,7 +96,7 @@ public class JraftServerImpl implements RaftServer {
private IgniteRpcServer rpcServer;
/** Started groups. */
- private final ConcurrentMap<String, RaftGroupService> groups = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ReplicationGroupId, RaftGroupService> groups = new ConcurrentHashMap<>();
/** Lock storage with predefined monitor objects,
* needed to prevent concurrent start of the same raft group. */
@@ -324,7 +325,7 @@ public class JraftServerImpl implements RaftServer {
* @param groupId Group id.
* @return The path to persistence folder.
*/
- public Path getServerDataPath(String groupId) {
+ public Path getServerDataPath(ReplicationGroupId groupId) {
ClusterNode clusterNode = service.topologyService().localMember();
String dirName = groupId + "_" + clusterNode.address().toString().replace(':', '_');
@@ -335,7 +336,7 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
public boolean startRaftGroup(
- String groupId,
+ ReplicationGroupId groupId,
RaftGroupListener lsnr,
@Nullable List<Peer> initialConf,
RaftGroupOptions groupOptions
@@ -346,20 +347,22 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
public boolean startRaftGroup(
- String grpId,
+ ReplicationGroupId replicaGrpId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
@Nullable List<Peer> initialConf,
RaftGroupOptions groupOptions
) {
+ String grpId = replicaGrpId.toString();
+
// fast track to check if group with the same name is already created.
- if (groups.containsKey(grpId)) {
+ if (groups.containsKey(replicaGrpId)) {
return false;
}
synchronized (groupMonitor(grpId)) {
// double check if group wasn't created before receiving the lock.
- if (groups.containsKey(grpId)) {
+ if (groups.containsKey(replicaGrpId)) {
return false;
}
@@ -367,7 +370,7 @@ public class JraftServerImpl implements RaftServer {
NodeOptions nodeOptions = opts.copy();
// TODO: IGNITE-17083 - Do not create paths for volatile stores at all when we get rid of snapshot storage on FS.
- Path serverDataPath = getServerDataPath(grpId);
+ Path serverDataPath = getServerDataPath(replicaGrpId);
try {
Files.createDirectories(serverDataPath);
@@ -418,7 +421,7 @@ public class JraftServerImpl implements RaftServer {
server.start();
- groups.put(grpId, server);
+ groups.put(replicaGrpId, server);
return true;
}
@@ -426,7 +429,7 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public boolean stopRaftGroup(String grpId) {
+ public boolean stopRaftGroup(ReplicationGroupId grpId) {
RaftGroupService svc = groups.remove(grpId);
boolean stopped = svc != null;
@@ -440,7 +443,7 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public Peer localPeer(String groupId) {
+ public Peer localPeer(ReplicationGroupId groupId) {
RaftGroupService service = groups.get(groupId);
if (service == null) {
@@ -458,13 +461,13 @@ public class JraftServerImpl implements RaftServer {
* @param groupId Group id.
* @return Service group.
*/
- public RaftGroupService raftGroupService(String groupId) {
+ public RaftGroupService raftGroupService(ReplicationGroupId groupId) {
return groups.get(groupId);
}
/** {@inheritDoc} */
@Override
- public Set<String> startedGroups() {
+ public Set<ReplicationGroupId> startedGroups() {
return groups.keySet();
}
@@ -475,7 +478,7 @@ public class JraftServerImpl implements RaftServer {
* @param predicate Predicate to block messages.
*/
@TestOnly
- public void blockMessages(String groupId, BiPredicate<Object, String> predicate) {
+ public void blockMessages(ReplicationGroupId groupId, BiPredicate<Object, String> predicate) {
IgniteRpcClient client = (IgniteRpcClient) groups.get(groupId).getNodeOptions().getRpcClient();
client.blockMessages(predicate);
@@ -487,7 +490,7 @@ public class JraftServerImpl implements RaftServer {
* @param groupId Raft group id.
*/
@TestOnly
- public void stopBlockMessages(String groupId) {
+ public void stopBlockMessages(ReplicationGroupId groupId) {
IgniteRpcClient client = (IgniteRpcClient) groups.get(groupId).getNodeOptions().getRpcClient();
client.stopBlock();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index d64dedb82e..79dbe0aef4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -54,6 +54,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
@@ -90,6 +91,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** */
private final String groupId;
+ private final ReplicationGroupId realGroupId;
+
/** */
private final RaftMessagesFactory factory;
@@ -124,7 +127,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param executor Executor for retrying requests.
*/
private RaftGroupServiceImpl(
- String groupId,
+ ReplicationGroupId groupId,
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
@@ -140,7 +143,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
this.factory = factory;
this.timeout = timeout;
this.rpcTimeout = rpcTimeout;
- this.groupId = groupId;
+ this.groupId = groupId.toString();
+ this.realGroupId = groupId;
this.retryDelay = retryDelay;
this.leader = leader;
this.executor = executor;
@@ -161,7 +165,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @return Future representing pending completion of the operation.
*/
public static CompletableFuture<RaftGroupService> start(
- String groupId,
+ ReplicationGroupId groupId,
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
@@ -206,7 +210,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @return Future representing pending completion of the operation.
*/
public static CompletableFuture<RaftGroupService> start(
- String groupId,
+ ReplicationGroupId groupId,
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
@@ -219,8 +223,8 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
/** {@inheritDoc} */
- @Override public @NotNull String groupId() {
- return groupId;
+ @Override public @NotNull ReplicationGroupId groupId() {
+ return realGroupId;
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 3e2a2f12b5..3960ba2d57 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -22,10 +22,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
@@ -70,7 +72,7 @@ public class LozaTest extends IgniteAbstractTest {
loza.beforeNodeStop();
loza.stop();
- String raftGroupId = "test_raft_group";
+ TestReplicationGroupId raftGroupId = new TestReplicationGroupId("test_raft_group");
List<ClusterNode> nodes = List.of(
new ClusterNode(UUID.randomUUID().toString(), UUID.randomUUID().toString(), NetworkAddress.from("127.0.0.1:123")));
@@ -91,4 +93,37 @@ public class LozaTest extends IgniteAbstractTest {
() -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier, defaults())
);
}
+
+ /**
+ * Test replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
index 300c670146..61cc3d540e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/RaftGroupServiceTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.net.ConnectException;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -107,6 +109,9 @@ public class RaftGroupServiceTest {
/** Current term */
private static final int CURRENT_TERM = 1;
+ /** Test group id. */
+ private static final TestReplicationGroupId TEST_GRP = new TestReplicationGroupId("test");
+
/** Mock cluster. */
@Mock
private ClusterService cluster;
@@ -143,12 +148,10 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRefreshLeaderStable() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
assertNull(service.leader());
@@ -162,15 +165,13 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRefreshLeaderNotElected() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
// Simulate running elections.
leader = null;
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
assertNull(service.leader());
@@ -189,8 +190,6 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRefreshLeaderElectedAfterDelay() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
// Simulate running elections.
@@ -205,7 +204,7 @@ public class RaftGroupServiceTest {
}, 500);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
assertNull(service.leader());
@@ -219,12 +218,10 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRefreshLeaderWithTimeout() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(true);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
try {
service.refreshLeader().get(500, TimeUnit.MILLISECONDS);
@@ -241,13 +238,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLeaderElected() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
service.refreshLeader().get();
@@ -261,13 +256,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLazyInitLeader() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
assertNull(service.leader());
@@ -283,13 +276,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestWithTimeout() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(true, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
try {
service.run(new TestCommand()).get(500, TimeUnit.MILLISECONDS);
@@ -306,13 +297,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLeaderNotElected() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
Peer leader = this.leader;
@@ -337,13 +326,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLeaderElectedAfterDelay() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
Peer leader = this.leader;
@@ -373,13 +360,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLeaderElectedAfterDelayWithFailedNode() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, NODES.get(0));
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT * 3, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
Peer leader = this.leader;
@@ -411,13 +396,11 @@ public class RaftGroupServiceTest {
*/
@Test
public void testUserRequestLeaderChanged() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
mockUserInput(false, null);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
Peer leader = this.leader;
@@ -443,12 +426,10 @@ public class RaftGroupServiceTest {
*/
@Test
public void testSnapshotExecutionException() throws Exception {
- String groupId = "test";
-
mockSnapshotRequest(1);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
var addr = new NetworkAddress("localhost", 8082);
@@ -469,12 +450,10 @@ public class RaftGroupServiceTest {
*/
@Test
public void testSnapshotExecutionFailedResponse() throws Exception {
- String groupId = "test";
-
mockSnapshotRequest(0);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
var addr = new NetworkAddress("localhost", 8082);
@@ -495,20 +474,18 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRefreshMembers() throws Exception {
- String groupId = "test";
-
List<String> respPeers = peersToIds(NODES.subList(0, 2));
List<String> respLearners = peersToIds(NODES.subList(2, 2));
when(messagingService.invoke(any(NetworkAddress.class),
- eq(FACTORY.getPeersRequest().onlyAlive(false).groupId(groupId).build()), anyLong()))
+ eq(FACTORY.getPeersRequest().onlyAlive(false).groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.getPeersResponse().peersList(respPeers).learnersList(respLearners).build()));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES, service.peers());
assertEquals(Collections.emptyList(), service.learners());
@@ -524,21 +501,19 @@ public class RaftGroupServiceTest {
*/
@Test
public void testAddPeer() throws Exception {
- String groupId = "test";
-
List<String> respPeers = peersToIds(NODES);
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.addPeerRequest()
.peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" + NODES.get(2).address().port()).toString())
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.addPeerResponse().newPeersList(respPeers).build()));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES.subList(0, 2), service.peers());
assertEquals(Collections.emptyList(), service.learners());
@@ -554,21 +529,19 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRemovePeer() throws Exception {
- String groupId = "test";
-
List<String> respPeers = peersToIds(NODES.subList(0, 2));
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.removePeerRequest()
.peerId(PeerId.parsePeer(NODES.get(2).address().host() + ":" + NODES.get(2).address().port()).toString())
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.removePeerResponse().newPeersList(respPeers).build()));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES, service.peers());
assertEquals(Collections.emptyList(), service.learners());
@@ -584,8 +557,6 @@ public class RaftGroupServiceTest {
*/
@Test
public void testChangePeers() throws Exception {
- String groupId = "test";
-
List<String> shrunkPeers = peersToIds(NODES.subList(0, 1));
List<String> extendedPeers = peersToIds(NODES);
@@ -593,21 +564,21 @@ public class RaftGroupServiceTest {
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.changePeersRequest()
.newPeersList(shrunkPeers)
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.changePeersResponse().newPeersList(shrunkPeers).build()));
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.changePeersRequest()
.newPeersList(extendedPeers)
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.changePeersResponse().newPeersList(extendedPeers).build()));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES.subList(0, 2), true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES.subList(0, 2), service.peers());
assertEquals(Collections.emptyList(), service.learners());
@@ -628,20 +599,18 @@ public class RaftGroupServiceTest {
*/
@Test
public void testTransferLeadership() throws Exception {
- String groupId = "test";
-
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.transferLeaderRequest()
.peerId(PeerId.fromPeer(NODES.get(1)).toString())
.leaderId(PeerId.fromPeer(NODES.get(0)).toString())
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(RaftRpcFactory.DEFAULT.newResponse(FACTORY, Status.OK())));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES.get(0), service.leader());
@@ -655,21 +624,19 @@ public class RaftGroupServiceTest {
*/
@Test
public void testAddLearners() throws Exception {
- String groupId = "test";
-
List<String> addLearners = peersToIds(NODES.subList(1, 3));
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.addLearnersRequest()
.learnersList(addLearners)
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.learnersOpResponse().newLearnersList(addLearners).build()));
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
assertEquals(NODES.subList(0, 1), service.peers());
assertEquals(Collections.emptyList(), service.learners());
@@ -685,8 +652,6 @@ public class RaftGroupServiceTest {
*/
@Test
public void testResetLearners() throws Exception {
- String groupId = "test";
-
List<String> addLearners = peersToIds(NODES.subList(1, 3));
List<String> resetLearners = peersToIds(NODES.subList(2, 3));
@@ -694,16 +659,16 @@ public class RaftGroupServiceTest {
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.resetLearnersRequest()
.learnersList(resetLearners)
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resetLearners).build()));
- mockAddLearners(groupId, addLearners, addLearners);
+ mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
service.addLearners(NODES.subList(1, 3)).get();
@@ -721,8 +686,6 @@ public class RaftGroupServiceTest {
*/
@Test
public void testRemoveLearners() throws Exception {
- String groupId = "test";
-
List<String> addLearners = peersToIds(NODES.subList(1, 3));
List<String> removeLearners = peersToIds(NODES.subList(2, 3));
@@ -733,16 +696,16 @@ public class RaftGroupServiceTest {
when(messagingService.invoke(any(NetworkAddress.class),
eq(FACTORY.removeLearnersRequest()
.learnersList(removeLearners)
- .groupId(groupId).build()), anyLong()))
+ .groupId(TEST_GRP.toString()).build()), anyLong()))
.then(invocation ->
completedFuture(FACTORY.learnersOpResponse().newLearnersList(resultLearners).build()));
- mockAddLearners(groupId, addLearners, addLearners);
+ mockAddLearners(TEST_GRP.toString(), addLearners, addLearners);
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES.subList(0, 1), true, DELAY, executor).get(3, TimeUnit.SECONDS);
service.addLearners(NODES.subList(1, 3)).get();
@@ -758,18 +721,16 @@ public class RaftGroupServiceTest {
/** */
@Test
public void testGetLeaderRequest() throws Exception {
- String groupId = "test";
-
mockLeaderRequest(false);
RaftGroupService service =
- RaftGroupServiceImpl.start(groupId, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
+ RaftGroupServiceImpl.start(TEST_GRP, cluster, FACTORY, TIMEOUT, NODES, false, DELAY, executor).get(3, TimeUnit.SECONDS);
assertNull(service.leader());
service.refreshLeader().get();
- GetLeaderRequest req = FACTORY.getLeaderRequest().groupId(groupId).build();
+ GetLeaderRequest req = FACTORY.getLeaderRequest().groupId(TEST_GRP.toString()).build();
GetLeaderResponse fut = (GetLeaderResponse) messagingService.invoke(leader.address(), req, TIMEOUT).get();
@@ -886,4 +847,37 @@ public class RaftGroupServiceTest {
/** */
private static class TestResponse {
}
+
+ /**
+ * Test replication group id.
+ */
+ protected static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 8d73188779..d1a6841fd9 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
@@ -67,7 +68,9 @@ public class RaftServerImpl implements RaftServer {
private final ClusterService service;
- private final ConcurrentMap<String, RaftGroupListener> listeners = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ReplicationGroupId, RaftGroupListener> listeners = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, RaftGroupListener> strListeners = new ConcurrentHashMap<>();
private final BlockingQueue<CommandClosureEx<ReadCommand>> readQueue;
@@ -112,7 +115,7 @@ public class RaftServerImpl implements RaftServer {
} else if (message instanceof ActionRequest) {
ActionRequest req0 = (ActionRequest) message;
- RaftGroupListener lsnr = listeners.get(req0.groupId());
+ RaftGroupListener lsnr = strListeners.get(req0.groupId());
if (lsnr == null) {
sendError(senderAddr, correlationId, RaftError.UNKNOWN);
@@ -177,13 +180,14 @@ public class RaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr,
+ public synchronized boolean startRaftGroup(ReplicationGroupId groupId, RaftGroupListener lsnr,
List<Peer> initialConf, RaftGroupOptions groupOptions) {
if (listeners.containsKey(groupId)) {
return false;
}
listeners.put(groupId, lsnr);
+ strListeners.put(groupId.toString(), lsnr);
return true;
}
@@ -191,7 +195,7 @@ public class RaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
public boolean startRaftGroup(
- String groupId,
+ ReplicationGroupId groupId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
List<Peer> initialConf,
@@ -202,19 +206,21 @@ public class RaftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
- public synchronized boolean stopRaftGroup(String groupId) {
+ public synchronized boolean stopRaftGroup(ReplicationGroupId groupId) {
+ strListeners.remove(groupId.toString());
+
return listeners.remove(groupId) != null;
}
/** {@inheritDoc} */
@Override
- public @Nullable Peer localPeer(String groupId) {
+ public @Nullable Peer localPeer(ReplicationGroupId groupId) {
return new Peer(service.topologyService().localMember().address());
}
/** {@inheritDoc} */
@Override
- public Set<String> startedGroups() {
+ public Set<ReplicationGroupId> startedGroups() {
return listeners.keySet();
}
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
index b0c7957fd1..144cfa857c 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -36,6 +37,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -301,7 +303,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
/**
* Returns raft group id for tests.
*/
- public abstract String raftGroupId();
+ public abstract TestReplicationGroupId raftGroupId();
/**
* Get the raft group listener from the jraft server.
@@ -310,7 +312,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
* @param grpId Raft group id.
* @return Raft group listener.
*/
- protected T getListener(JraftServerImpl server, String grpId) {
+ protected T getListener(JraftServerImpl server, TestReplicationGroupId grpId) {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(grpId);
JraftServerImpl.DelegatingStateMachine fsm =
@@ -427,7 +429,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
*
* @return The service.
*/
- private RaftGroupService startClient(TestInfo testInfo, String groupId, NetworkAddress addr) throws Exception {
+ private RaftGroupService startClient(TestInfo testInfo, TestReplicationGroupId groupId, NetworkAddress addr) throws Exception {
ClusterService clientNode = clusterService(testInfo, CLIENT_PORT + clients.size(), addr);
RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
@@ -440,4 +442,37 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
return client;
}
+
+ /**
+ * Test replication group id.
+ */
+ protected static class TestReplicationGroupId implements ReplicationGroupId {
+ private final String name;
+
+ public TestReplicationGroupId(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return Objects.equals(name, that.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 56af09bcff..447cec4c81 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -27,7 +27,7 @@ import org.apache.ignite.lang.IgniteStringFormatter;
*/
public class Replica {
/** Replica group identity, this id is the same as the considered partition's id. */
- private final String replicaGrpId;
+ private final ReplicationGroupId replicaGrpId;
/** Replica listener. */
private final ReplicaListener listener;
@@ -39,7 +39,7 @@ public class Replica {
* @param listener Replica listener.
*/
public Replica(
- String replicaGrpId,
+ ReplicationGroupId replicaGrpId,
ReplicaListener listener
) {
this.replicaGrpId = replicaGrpId;
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index ee83676565..e91c644f1e 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -43,6 +43,7 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NetworkMessageHandler;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Replica manager maintains {@link Replica} instances on an Ignite node.
@@ -70,7 +71,7 @@ public class ReplicaManager implements IgniteComponent {
private final NetworkMessageHandler handler;
/** Replicas. */
- private final ConcurrentHashMap<String, Replica> replicas = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<ReplicationGroupId, Replica> replicas = new ConcurrentHashMap<>();
/** A hybrid logical clock. */
private final HybridClock clock;
@@ -144,7 +145,7 @@ public class ReplicaManager implements IgniteComponent {
* @return Instance of the replica or {@code null} if the replica is not started.
* @throws NodeStoppingException If the node is stopping.
*/
- public Replica replica(String replicaGrpId) throws NodeStoppingException {
+ public Replica replica(ReplicationGroupId replicaGrpId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
@@ -166,7 +167,7 @@ public class ReplicaManager implements IgniteComponent {
* @throws ReplicaIsAlreadyStartedException Is thrown when a replica with the same replication group id has already been started.
*/
public Replica startReplica(
- String replicaGrpId,
+ ReplicationGroupId replicaGrpId,
ReplicaListener listener) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
@@ -186,7 +187,7 @@ public class ReplicaManager implements IgniteComponent {
* @param listener Replica listener.
* @return New replica.
*/
- private Replica startReplicaInternal(String replicaGrpId, ReplicaListener listener) {
+ private Replica startReplicaInternal(ReplicationGroupId replicaGrpId, ReplicaListener listener) {
var replica = new Replica(replicaGrpId, listener);
Replica previous = replicas.putIfAbsent(replicaGrpId, replica);
@@ -205,7 +206,7 @@ public class ReplicaManager implements IgniteComponent {
* @return True if the replica is found and closed, false otherwise.
* @throws NodeStoppingException If the node is stopping.
*/
- public boolean stopReplica(String replicaGrpId) throws NodeStoppingException {
+ public boolean stopReplica(ReplicationGroupId replicaGrpId) throws NodeStoppingException {
if (!busyLock.enterBusy()) {
throw new NodeStoppingException();
}
@@ -223,7 +224,7 @@ public class ReplicaManager implements IgniteComponent {
* @param replicaGrpId Replication group id.
* @return True if the replica is found and closed, false otherwise.
*/
- private boolean stopReplicaInternal(String replicaGrpId) {
+ private boolean stopReplicaInternal(ReplicationGroupId replicaGrpId) {
return replicas.remove(replicaGrpId) != null;
}
@@ -339,4 +340,14 @@ public class ReplicaManager implements IgniteComponent {
.build();
}
}
+
+ /**
+ * Returns started replication groups.
+ *
+ * @return Set of started replication groups.
+ */
+ @TestOnly
+ public Set<ReplicationGroupId> startedGroups() {
+ return replicas.keySet();
+ }
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaIsAlreadyStartedException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaIsAlreadyStartedException.java
index c3fa86cb1b..ded9b2d7d1 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaIsAlreadyStartedException.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaIsAlreadyStartedException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.replicator.exception;
import java.util.UUID;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
@@ -32,7 +33,7 @@ public class ReplicaIsAlreadyStartedException extends IgniteInternalException {
*
* @param replicaGrpId Replication group id.
*/
- public ReplicaIsAlreadyStartedException(String replicaGrpId) {
+ public ReplicaIsAlreadyStartedException(ReplicationGroupId replicaGrpId) {
super(Replicator.REPLICA_IS_ALREADY_STARTED_ERR,
IgniteStringFormatter.format("Replica is started already [replicaGrpId={}]", replicaGrpId));
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
index 2aa81e9f6e..5c3f61eae0 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.replicator.exception;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import java.util.UUID;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -33,7 +34,7 @@ public class ReplicaUnavailableException extends IgniteInternalException {
* @param groupId Replication group id.
* @param node Node.
*/
- public ReplicaUnavailableException(String groupId, ClusterNode node) {
+ public ReplicaUnavailableException(ReplicationGroupId groupId, ClusterNode node) {
super(REPLICA_UNAVAILABLE_ERR, "Replica is not ready [replicationGroupId=" + groupId + ", nodeName=" + node.name() + ']');
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
index ff4a0ee57a..25ebce1186 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.replicator.exception;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
import java.util.UUID;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteInternalException;
/**
@@ -31,7 +32,7 @@ public class ReplicationException extends IgniteInternalException {
*
* @param replicaGrpId Replication group id.
*/
- public ReplicationException(String replicaGrpId) {
+ public ReplicationException(ReplicationGroupId replicaGrpId) {
this(replicaGrpId, null);
}
@@ -41,7 +42,7 @@ public class ReplicationException extends IgniteInternalException {
* @param replicaGrpId Replication group id.
* @param cause Optional nested exception (can be {@code null}).
*/
- public ReplicationException(String replicaGrpId, Throwable cause) {
+ public ReplicationException(ReplicationGroupId replicaGrpId, Throwable cause) {
this(REPLICA_COMMON_ERR, "Failed to process replica request [replicaGroupId=" + replicaGrpId + ']', cause);
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationTimeoutException.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationTimeoutException.java
index d6993350b6..4941436346 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationTimeoutException.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationTimeoutException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.replicator.exception;
import java.util.UUID;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
@@ -31,7 +32,7 @@ public class ReplicationTimeoutException extends IgniteInternalException {
*
* @param replicaGrpId Replication group id.
*/
- public ReplicationTimeoutException(String replicaGrpId) {
+ public ReplicationTimeoutException(ReplicationGroupId replicaGrpId) {
super(Replicator.REPLICA_TIMEOUT_ERR, IgniteStringFormatter.format("Replication is timed out [replicaGrpId={}]", replicaGrpId));
}
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
index 482f6fea3e..bcc6932e5f 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/message/ReplicaRequest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.replicator.message;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
/**
* Replica request.
@@ -28,5 +30,6 @@ public interface ReplicaRequest extends NetworkMessage {
*
* @return Replication group id.
*/
- String groupId();
+ @Marshallable
+ ReplicationGroupId groupId();
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index b9ae94b97a..83c9c25382 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -58,6 +58,7 @@ import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.rest.configuration.RestConfiguration;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
@@ -264,8 +265,8 @@ public class ItRebalanceDistributedTest {
var countDownLatch = new CountDownLatch(1);
- String raftGroupNodeName = leaderNode.raftManager.server().startedGroups()
- .stream().filter(grp -> grp.contains("part")).findFirst().get();
+ ReplicationGroupId raftGroupNodeName = leaderNode.raftManager.server().startedGroups()
+ .stream().filter(grp -> grp.toString().contains("part")).findFirst().get();
((JraftServerImpl) leaderNode.raftManager.server()).blockMessages(
raftGroupNodeName, (msg, node) -> {
@@ -313,12 +314,13 @@ public class ItRebalanceDistributedTest {
waitPartitionAssignmentsSyncedToExpected(0, 1);
JraftServerImpl raftServer = (JraftServerImpl) nodes.stream()
- .filter(n -> n.raftManager.startedGroups().stream().anyMatch(grp -> grp.contains("_part_"))).findFirst()
+ .filter(n -> n.raftManager.startedGroups().stream().anyMatch(grp -> grp.toString().contains("_part_"))).findFirst()
.get().raftManager.server();
AtomicInteger counter = new AtomicInteger(0);
- String partGrpId = raftServer.startedGroups().stream().filter(grp -> grp.contains("_part_")).findFirst().get();
+ ReplicationGroupId partGrpId = raftServer.startedGroups().stream().filter(grp -> grp.toString().contains("_part_")).findFirst()
+ .get();
raftServer.blockMessages(partGrpId, (msg, node) -> {
if (msg instanceof RpcRequests.PingRequest) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 2a4946b39e..7363c5428a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
@@ -36,6 +37,7 @@ import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -187,7 +189,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
createTableWithData(ignite, TABLE_NAME, 3, 1);
TableImpl table = (TableImpl) ignite.tables().table(TABLE_NAME);
- String tableId = table.tableId().toString();
+ UUID tableId = table.tableId();
// Find the leader of the table's partition group.
RaftGroupService raftGroupService = table.internalTable().partitionRaftGroupService(0);
@@ -208,7 +210,13 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
// Check that it restarts.
assertTrue(IgniteTestUtils.waitForCondition(
- () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ () -> loza.startedGroups().stream().anyMatch(grpName -> {
+ if (grpName instanceof TablePartitionId) {
+ return ((TablePartitionId) grpName).getTableId().equals(tableId);
+ }
+
+ return true;
+ }),
TimeUnit.SECONDS.toMillis(10)
));
@@ -231,7 +239,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
createTableWithData(ignite0, TABLE_NAME, 3, 1);
TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
- String tableId = table.tableId().toString();
+ UUID tableId = table.tableId();
// Lose the majority.
stopNode(1);
@@ -243,7 +251,13 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
// Check that it restarts.
assertTrue(IgniteTestUtils.waitForCondition(
- () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ () -> loza.startedGroups().stream().anyMatch(grpName -> {
+ if (grpName instanceof TablePartitionId) {
+ return ((TablePartitionId) grpName).getTableId().equals(tableId);
+ }
+
+ return true;
+ }),
TimeUnit.SECONDS.toMillis(10)
));
@@ -265,7 +279,7 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
createTableWithData(ignite0, TABLE_NAME, 3, 1);
TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
- String tableId = table.tableId().toString();
+ UUID tableId = table.tableId();
stopNode(0);
stopNode(1);
@@ -280,7 +294,13 @@ public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
Loza loza = ignite(i).raftManager();
assertTrue(IgniteTestUtils.waitForCondition(
- () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+ () -> loza.startedGroups().stream().anyMatch(grpName -> {
+ if (grpName instanceof TablePartitionId) {
+ return ((TablePartitionId) grpName).getTableId().equals(tableId);
+ }
+
+ return true;
+ }),
TimeUnit.SECONDS.toMillis(10)
));
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index be2bf5a686..70e27682d7 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -248,8 +248,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
/** {@inheritDoc} */
@Override
- public String raftGroupId() {
- return "partitions";
+ public TestReplicationGroupId raftGroupId() {
+ return new TestReplicationGroupId("partitions");
}
/**
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index f380f04e36..658737a72d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -362,7 +364,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
for (int p = 0; p < assignment.size(); p++) {
List<ClusterNode> partNodes = assignment.get(p);
- String grpId = name + "-part-" + p;
+ TablePartitionId grpId = new TablePartitionId(tblId, p);
List<Peer> conf = partNodes.stream().map(n -> n.address()).map(Peer::new)
.collect(Collectors.toList());
@@ -373,7 +375,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
var placementDriver = new PlacementDriver(replicaServices.get(node));
for (int part = 0; part < assignment.size(); part++) {
- String replicaGrpId = name + "-part-" + part;
+ ReplicationGroupId replicaGrpId = new TablePartitionId(tblId, part);
placementDriver.updateAssignment(replicaGrpId, assignment.get(part));
}
@@ -398,14 +400,13 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
raftSvc -> {
try {
replicaManagers.get(node).startReplica(
- grpId,
+ new TablePartitionId(tblId, partId),
new PartitionReplicaListener(
testMpPartStorage,
raftSvc,
txManagers.get(node),
txManagers.get(node).lockManager(),
partId,
- grpId,
tblId,
primaryIndex,
clocks.get(node),
@@ -494,11 +495,15 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
ReplicaManager replicaMgr = replicaManagers.get(entry.getKey());
- Set<String> grps = rs.startedGroups();
+ Set<ReplicationGroupId> replicaGrps = replicaMgr.startedGroups();
- for (String grp : grps) {
+ for (ReplicationGroupId grp : replicaGrps) {
replicaMgr.stopReplica(grp);
+ }
+
+ Set<ReplicationGroupId> grps = rs.startedGroups();
+ for (ReplicationGroupId grp : grps) {
rs.stopRaftGroup(grp);
}
@@ -559,13 +564,13 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
/** {@inheritDoc} */
@Override
- protected boolean assertPartitionsSame(Table table, int partId) {
+ protected boolean assertPartitionsSame(TableImpl table, int partId) {
int hash = 0;
for (Map.Entry<ClusterNode, Loza> entry : raftServers.entrySet()) {
Loza svc = (Loza) entry.getValue();
JraftServerImpl server = (JraftServerImpl) svc.server();
- org.apache.ignite.raft.jraft.RaftGroupService grp = server.raftGroupService(table.name() + "-part-" + partId);
+ org.apache.ignite.raft.jraft.RaftGroupService grp = server.raftGroupService(new TablePartitionId(table.tableId(), partId));
JraftServerImpl.DelegatingStateMachine fsm = (JraftServerImpl.DelegatingStateMachine) grp
.getRaftNode().getOptions().getFsm();
PartitionListener listener = (PartitionListener) fsm.getListener();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 051c179b3b..1b20a55077 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -72,6 +73,7 @@ import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
@@ -129,10 +131,11 @@ public class ItColocationTest {
TxManager txManager = new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock()) {
@Override
public CompletableFuture<Void> finish(
+ ReplicationGroupId commitPartition,
ClusterNode recipientNode,
Long term,
boolean commit,
- Map<ClusterNode, List<IgniteBiTuple<String, Long>>> groups,
+ Map<ClusterNode, List<IgniteBiTuple<ReplicationGroupId, Long>>> groups,
UUID txId) {
return completedFuture(null);
}
@@ -140,10 +143,12 @@ public class ItColocationTest {
txManager.start();
Int2ObjectMap<RaftGroupService> partRafts = new Int2ObjectOpenHashMap<>();
- Map<String, RaftGroupService> groupRafts = new HashMap<>();
+ Map<ReplicationGroupId, RaftGroupService> groupRafts = new HashMap<>();
+
+ UUID tblId = UUID.randomUUID();
for (int i = 0; i < PARTS; ++i) {
- String groupId = "PUBLIC.TEST_part_" + i;
+ TablePartitionId groupId = new TablePartitionId(tblId, i);
RaftGroupService r = Mockito.mock(RaftGroupService.class);
when(r.leader()).thenReturn(Mockito.mock(Peer.class));
@@ -168,11 +173,12 @@ public class ItColocationTest {
}).when(r).run(any());
partRafts.put(i, r);
- groupRafts.put(groupId, r);
+ groupRafts.put(new TablePartitionId(tblId, i), r);
}
when(replicaService.invoke(any(), any())).thenAnswer(invocation -> {
ReplicaRequest request = invocation.getArgument(1);
+ var commitPartId = new TablePartitionId(UUID.randomUUID(), 0);
RaftGroupService r = groupRafts.get(request.groupId());
@@ -181,17 +187,22 @@ public class ItColocationTest {
.stream()
.collect(toMap(row -> new RowId(0), row -> row));
- return r.run(new UpdateAllCommand(rows, UUID.randomUUID()));
+ return r.run(new UpdateAllCommand(commitPartId, rows, UUID.randomUUID()));
} else {
assertThat(request, is(instanceOf(ReadWriteSingleRowReplicaRequest.class)));
- return r.run(new UpdateCommand(new RowId(0), ((ReadWriteSingleRowReplicaRequest) request).binaryRow(), UUID.randomUUID()));
+ return r.run(new UpdateCommand(
+ commitPartId,
+ new RowId(0),
+ ((ReadWriteSingleRowReplicaRequest) request).binaryRow(),
+ UUID.randomUUID())
+ );
}
});
INT_TABLE = new InternalTableImpl(
"PUBLIC.TEST",
- UUID.randomUUID(),
+ tblId,
partRafts,
PARTS,
null,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 14a6cfe21a..42fc63b908 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -124,6 +124,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnaps
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
@@ -585,9 +586,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture<?>[] futures = new CompletableFuture<?>[partCnt];
for (int i = 0; i < partCnt; i++) {
- String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
+ TablePartitionId replicaGrpId = new TablePartitionId(((ExtendedTableConfiguration) tblCfg).id().value(), i);
- futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), partId, baselineMgr.nodes(), newReplicas,
+ futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), replicaGrpId, baselineMgr.nodes(), newReplicas,
replicasCtx.storageRevision(), metaStorageMgr, i);
}
@@ -683,9 +684,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
// other cases will be covered by rebalance logic
Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptySet();
- String grpId = partitionRaftGroupName(tblId, partId);
+ TablePartitionId replicaGrpId = new TablePartitionId(tblId, partId);
- placementDriver.updateAssignment(grpId, nodes);
+ placementDriver.updateAssignment(replicaGrpId, nodes);
CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
@@ -712,8 +713,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
boolean majorityAvailable = dataNodesCount >= (partAssignments.size() / 2) + 1;
if (majorityAvailable) {
- RebalanceUtil.startPeerRemoval(partitionRaftGroupName(tblId, partId), localMember,
- metaStorageMgr);
+ RebalanceUtil.startPeerRemoval(replicaGrpId, localMember, metaStorageMgr);
return false;
} else {
@@ -738,7 +738,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
try {
raftMgr.startRaftGroupNode(
- grpId,
+ replicaGrpId,
newPartAssignment,
new PartitionListener(
partitionStorage,
@@ -749,7 +749,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
new RebalanceRaftGroupEventsListener(
metaStorageMgr,
tablesCfg.tables().get(tablesById.get(tblId).name()),
- grpId,
+ replicaGrpId,
partId,
busyLock,
movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
@@ -770,7 +770,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
futures[partId] = startGroupFut
.thenComposeAsync((v) -> {
try {
- return raftMgr.startRaftGroupService(grpId, newPartAssignment);
+ return raftMgr.startRaftGroupService(replicaGrpId, newPartAssignment);
} catch (NodeStoppingException ex) {
return CompletableFuture.failedFuture(ex);
}
@@ -784,14 +784,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
MvPartitionStorage partitionStorage = internalTbl.storage().getOrCreateMvPartition(partId);
try {
- replicaMgr.startReplica(grpId,
+ replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
partitionStorage,
updatedRaftGroupService,
txManager,
lockMgr,
partId,
- grpId,
tblId,
primaryIndex,
clock,
@@ -905,9 +904,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
for (TableImpl table : tables.values()) {
try {
for (int p = 0; p < table.internalTable().partitions(); p++) {
- raftMgr.stopRaftGroup(partitionRaftGroupName(table.tableId(), p));
+ TablePartitionId replicationGroupId = new TablePartitionId(table.tableId(), p);
- replicaMgr.stopReplica(partitionRaftGroupName(table.tableId(), p));
+ raftMgr.stopRaftGroup(replicationGroupId);
+
+ replicaMgr.stopReplica(replicationGroupId);
}
table.internalTable().storage().stop();
@@ -1057,9 +1058,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
int partitions = assignment.size();
for (int p = 0; p < partitions; p++) {
- raftMgr.stopRaftGroup(partitionRaftGroupName(tblId, p));
+ TablePartitionId replicationGroupId = new TablePartitionId(tblId, p);
+
+ raftMgr.stopRaftGroup(replicationGroupId);
- replicaMgr.stopReplica(partitionRaftGroupName(tblId, p));
+ replicaMgr.stopReplica(replicationGroupId);
}
tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
@@ -1096,18 +1099,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, tableCfg.value().replicas());
}
- /**
- * Compounds a RAFT group unique name.
- *
- * @param tblId Table identifier.
- * @param partition Number of table partitions.
- * @return A RAFT group name.
- */
- @NotNull
- private String partitionRaftGroupName(UUID tblId, int partition) {
- return tblId + "_part_" + partition;
- }
-
/**
* Creates a new table with the given {@code name} asynchronously. If a table with the same name already exists,
* a future will be completed with {@link TableAlreadyExistsException}.
@@ -1682,12 +1673,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
int partId = extractPartitionNumber(pendingAssignmentsWatchEvent.key());
UUID tblId = extractTableId(pendingAssignmentsWatchEvent.key(), PENDING_ASSIGNMENTS_PREFIX);
- String grpId = partitionRaftGroupName(tblId, partId);
+ TablePartitionId replicaGrpId = new TablePartitionId(tblId, partId);
// Assignments of the pending rebalance that we received through the meta storage watch mechanism.
Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
- var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(grpId)).join();
+ var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId)).join();
assert pendingAssignmentsWatchEvent.revision() <= pendingAssignments.revision()
: "Meta Storage watch cannot notify about an event with the revision that is more than the actual revision.";
@@ -1702,7 +1693,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
+ tbl.internalTable().storage().getClass().getName();
// Stable assignments from the meta store, which revision is bounded by the current pending event.
- byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(grpId),
+ byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(replicaGrpId),
pendingAssignmentsWatchEvent.revision()).join().value();
Set<ClusterNode> assignments = stableAssignments == null
@@ -1710,7 +1701,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
: ByteUtils.fromBytes(stableAssignments);
- placementDriver.updateAssignment(grpId, assignments);
+ placementDriver.updateAssignment(replicaGrpId, assignments);
ClusterNode localMember = raftMgr.topologyService().localMember();
@@ -1745,7 +1736,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
metaStorageMgr,
tblCfg,
- grpId,
+ replicaGrpId,
partId,
busyLock,
movePartition(() -> tbl.internalTable().partitionRaftGroupService(partId)),
@@ -1754,7 +1745,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
);
raftMgr.startRaftGroupNode(
- grpId,
+ replicaGrpId,
assignments,
raftGrpLsnr,
raftGrpEvtsLsnr,
@@ -1765,14 +1756,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (replicaMgr.shouldHaveReplicationGroupLocally(deltaPeers)) {
MvPartitionStorage partitionStorage = tbl.internalTable().storage().getOrCreateMvPartition(partId);
- replicaMgr.startReplica(grpId,
+ replicaMgr.startReplica(replicaGrpId,
new PartitionReplicaListener(
partitionStorage,
tbl.internalTable().partitionRaftGroupService(partId),
txManager,
lockMgr,
partId,
- grpId,
tblId,
primaryIndex,
clock,
@@ -1802,7 +1792,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (localMember.address().equals(leaderWithTerm.get1().address())) {
LOG.info("Current node={} is the leader of partition raft group={}. "
+ "Initiate rebalance process for partition={}, table={}",
- localMember.address(), grpId, partId, tbl.name());
+ localMember.address(), replicaGrpId, partId, tbl.name());
partGrpSvc.changePeersAsync(newNodes, leaderWithTerm.get2()).join();
}
@@ -1838,11 +1828,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
int part = extractPartitionNumber(stableAssignmentsWatchEvent.key());
UUID tblId = extractTableId(stableAssignmentsWatchEvent.key(), STABLE_ASSIGNMENTS_PREFIX);
- String partId = partitionRaftGroupName(tblId, part);
+ TablePartitionId replicaGrpId = new TablePartitionId(tblId, part);
Set<ClusterNode> stableAssignments = ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
- byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(partId),
+ byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(replicaGrpId),
stableAssignmentsWatchEvent.revision()).join().value();
Set<ClusterNode> pendingAssignments = pendingFromMetastorage == null
@@ -1853,9 +1843,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
ClusterNode localMember = raftMgr.topologyService().localMember();
if (!stableAssignments.contains(localMember) && !pendingAssignments.contains(localMember)) {
- raftMgr.stopRaftGroup(partId);
+ raftMgr.stopRaftGroup(replicaGrpId);
- replicaMgr.stopReplica(partId);
+ replicaMgr.stopReplica(new TablePartitionId(tblId, part));
}
} catch (NodeStoppingException e) {
// no-op
@@ -1881,7 +1871,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
int partitionNumber = extractPartitionNumber(key);
UUID tblId = extractTableId(key, ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
- String partitionId = partitionRaftGroupName(tblId, partitionNumber);
+ TablePartitionId replicaGrpId = new TablePartitionId(tblId, partitionNumber);
TableImpl tbl = tablesByIdVv.latest().get(tblId);
@@ -1892,7 +1882,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
baselineMgr.nodes(),
tblCfg.value().replicas(),
partitionNumber,
- partitionId,
+ replicaGrpId,
evt
);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
index e513d74152..750f359924 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.command;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
/**
* State machine command to finish a transaction on a commit or a rollback.
@@ -38,7 +39,7 @@ public class FinishTxCommand extends PartitionCommand {
/**
* Replication groups ids.
*/
- private final List<String> replicationGroupIds;
+ private final List<ReplicationGroupId> replicationGroupIds;
/**
* The constructor.
@@ -48,7 +49,7 @@ public class FinishTxCommand extends PartitionCommand {
* @param commitTimestamp Transaction commit timestamp.
* @param replicationGroupIds Set of replication groups ids.
*/
- public FinishTxCommand(UUID txId, boolean commit, HybridTimestamp commitTimestamp, List<String> replicationGroupIds) {
+ public FinishTxCommand(UUID txId, boolean commit, HybridTimestamp commitTimestamp, List<ReplicationGroupId> replicationGroupIds) {
super(txId);
this.commit = commit;
this.commitTimestamp = commitTimestamp;
@@ -78,7 +79,7 @@ public class FinishTxCommand extends PartitionCommand {
*
* @return An ordered replication groups ids.
*/
- public List<String> replicationGroupIds() {
+ public List<ReplicationGroupId> replicationGroupIds() {
return replicationGroupIds;
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 67d4f305be..2f2a88500e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.NotNull;
@@ -30,6 +31,9 @@ import org.jetbrains.annotations.NotNull;
* State machine command for updating a batch of entries.
*/
public class UpdateAllCommand extends PartitionCommand {
+ /** Committed table partition id. */
+ private final TablePartitionId commitReplicationGroupId;
+
/** Rows to update. */
private transient HashMap<RowId, BinaryRow> rowsToUpdate;
@@ -39,33 +43,43 @@ public class UpdateAllCommand extends PartitionCommand {
/**
* Constructor for batch remove.
*
+ * @param commitReplicationGroupId Committed table partition id.
* @param removeRows Ids to remove.
* @param txId Transaction id.
*/
- public UpdateAllCommand(Collection<RowId> removeRows, @NotNull UUID txId) {
- this(removeRows, null, txId);
+ public UpdateAllCommand(@NotNull TablePartitionId commitReplicationGroupId, Collection<RowId> removeRows, @NotNull UUID txId) {
+ this(commitReplicationGroupId, removeRows, null, txId);
}
/**
* Constructor for a batch update.
*
+ * @param commitReplicationGroupId Committed table partition id.
* @param rowsToUpdate Rows to update or insert.
* @param txId Transaction id.
*/
- public UpdateAllCommand(Map<RowId, BinaryRow> rowsToUpdate, @NotNull UUID txId) {
- this(null, rowsToUpdate, txId);
+ public UpdateAllCommand(@NotNull TablePartitionId commitReplicationGroupId, Map<RowId, BinaryRow> rowsToUpdate, @NotNull UUID txId) {
+ this(commitReplicationGroupId, null, rowsToUpdate, txId);
}
/**
* The constructor.
*
+ * @param commitReplicationGroupId Committed table partition id.
* @param removeRows Ids to remove.
* @param rowsToUpdate Rows to update or insert.
* @param txId Transaction id.
*/
- private UpdateAllCommand(Collection<RowId> removeRows, Map<RowId, BinaryRow> rowsToUpdate, @NotNull UUID txId) {
+ private UpdateAllCommand(
+ @NotNull TablePartitionId commitReplicationGroupId,
+ Collection<RowId> removeRows,
+ Map<RowId, BinaryRow> rowsToUpdate,
+ @NotNull UUID txId
+ ) {
super(txId);
+ this.commitReplicationGroupId = commitReplicationGroupId;
+
int size = (removeRows == null ? 0 : removeRows.size()) + (rowsToUpdate == null ? 0 : rowsToUpdate.size());
HashMap<RowId, BinaryRow> rows = new HashMap<>(size);
@@ -83,6 +97,15 @@ public class UpdateAllCommand extends PartitionCommand {
rowsToUpdateBytes = CommandUtils.rowMapToBytes(rows);
}
+ /**
+ * Gets a table partition id that the commit partition.
+ *
+ * @return Table partition id.
+ */
+ public TablePartitionId getReplicationGroupId() {
+ return commitReplicationGroupId;
+ }
+
/**
* Gets rows to update.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 8143052f5c..719db8e50e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -28,6 +29,9 @@ import org.jetbrains.annotations.Nullable;
* State machine command to update a row specified by a row id.
*/
public class UpdateCommand extends PartitionCommand {
+ /** Committed table partition id. */
+ private final TablePartitionId commitReplicationGroupId;
+
/** Id of a row that will be updated. */
private final RowId rowId;
@@ -40,14 +44,21 @@ public class UpdateCommand extends PartitionCommand {
/**
* Creates a new instance of UpdateCommand with the given row to be updated. The {@code rowId} should not be {@code null}.
*
+ * @param commitReplicationGroupId Committed table partition id.
* @param rowId Row id.
* @param row Binary row.
* @param txId The transaction id.
* @see PartitionCommand
*/
- public UpdateCommand(@NotNull RowId rowId, @Nullable BinaryRow row, @NotNull UUID txId) {
+ public UpdateCommand(
+ @NotNull TablePartitionId commitReplicationGroupId,
+ @NotNull RowId rowId,
+ @Nullable BinaryRow row,
+ @NotNull UUID txId
+ ) {
super(txId);
+ this.commitReplicationGroupId = commitReplicationGroupId;
this.rowId = rowId;
this.row = row;
@@ -57,11 +68,21 @@ public class UpdateCommand extends PartitionCommand {
/**
* Constructor for remove operation.
*
+ * @param commitReplicationGroupId Committed table partition id.
* @param rowId Row id.
* @param txId Transaction id.
*/
- public UpdateCommand(@NotNull RowId rowId, @NotNull UUID txId) {
- this(rowId, null, txId);
+ public UpdateCommand(@NotNull TablePartitionId commitReplicationGroupId, @NotNull RowId rowId, @NotNull UUID txId) {
+ this(commitReplicationGroupId, rowId, null, txId);
+ }
+
+ /**
+ * Gets a table partition id that the commit partition.
+ *
+ * @return Table partition id.
+ */
+ public TablePartitionId getCommitReplicationGroupId() {
+ return commitReplicationGroupId;
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index b7e7f1f0c9..28f600d67c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -164,9 +164,10 @@ public class PartitionListener implements RaftGroupListener {
BinaryRow row = cmd.getRow();
RowId rowId = cmd.getRowId();
UUID txId = cmd.txId();
+ UUID commitTblId = cmd.getCommitReplicationGroupId().getTableId();
+ int commitPartId = cmd.getCommitReplicationGroupId().getPartId();
- // TODO: IGNITE-17759 Need pass appropriate commitTableId and commitPartitionId.
- storage.addWrite(rowId, row, txId, UUID.randomUUID(), 0);
+ storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
txsPendingRowIds.computeIfAbsent(txId, entry -> new HashSet<>()).add(rowId);
@@ -208,13 +209,15 @@ public class PartitionListener implements RaftGroupListener {
storage.runConsistently(() -> {
UUID txId = cmd.txId();
Map<RowId, BinaryRow> rowsToUpdate = cmd.getRowsToUpdate();
+ UUID commitTblId = cmd.getReplicationGroupId().getTableId();
+ int commitPartId = cmd.getReplicationGroupId().getPartId();
if (!CollectionUtils.nullOrEmpty(rowsToUpdate)) {
for (Map.Entry<RowId, BinaryRow> entry : rowsToUpdate.entrySet()) {
RowId rowId = entry.getKey();
BinaryRow row = entry.getValue();
- // TODO: IGNITE-17759 Need pass appropriate commitTableId and commitPartitionId.
- storage.addWrite(rowId, row, txId, UUID.randomUUID(), 0);
+
+ storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
txsPendingRowIds.computeIfAbsent(txId, entry0 -> new HashSet<>()).add(rowId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index 779850d8dd..a231703a90 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.metastorage.client.Update;
import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ByteArray;
@@ -112,7 +113,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
private final TableConfiguration tblConfiguration;
/** Unique partition id. */
- private final String partId;
+ private final TablePartitionId partId;
/** Partition number. */
private final int partNum;
@@ -147,7 +148,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
public RebalanceRaftGroupEventsListener(
MetaStorageManager metaStorageMgr,
TableConfiguration tblConfiguration,
- String partId,
+ TablePartitionId partId,
int partNum,
IgniteSpinBusyLock busyLock,
BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index 8cf5e73a50..4ddd3d9778 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -25,4 +27,11 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
public interface ReadWriteMultiRowReplicaRequest extends MultipleRowReplicaRequest, ReadWriteReplicaRequest {
+ /**
+ * Gets a commit partition id.
+ *
+ * @return Table partition id.
+ */
+ @Marshallable
+ TablePartitionId commitPartitionId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index d6d9d5d765..97b2db0ff9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -25,4 +27,11 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
public interface ReadWriteSingleRowReplicaRequest extends SingleRowReplicaRequest, ReadWriteReplicaRequest {
+ /**
+ * Gets a commit partition id.
+ *
+ * @return Table partition id.
+ */
+ @Marshallable
+ TablePartitionId commitPartitionId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index 9df95860ea..1f4ddb5e63 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -25,4 +27,11 @@ import org.apache.ignite.network.annotations.Transferable;
*/
@Transferable(TableMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, ReadWriteReplicaRequest {
+ /**
+ * Gets a commit partition id.
+ *
+ * @return Table partition id.
+ */
+ @Marshallable
+ TablePartitionId commitPartitionId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 64c8a2079f..91f807c1d5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -37,6 +37,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
@@ -92,7 +93,7 @@ public class PartitionReplicaListener implements ReplicaListener {
private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
/** Replication group id. */
- private final String replicationGroupId;
+ private final TablePartitionId replicationGroupId;
/** Partition id. */
private final int partId;
@@ -154,7 +155,6 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txManager Transaction manager.
* @param lockManager Lock manager.
* @param partId Partition id.
- * @param replicationGroupId replication group id.
* @param tableId Table id.
* @param primaryIndex Primary index.
* @param hybridClock Hybrid clock.
@@ -168,7 +168,6 @@ public class PartitionReplicaListener implements ReplicaListener {
TxManager txManager,
LockManager lockManager,
int partId,
- String replicationGroupId,
UUID tableId,
ConcurrentHashMap<ByteBuffer, RowId> primaryIndex,
HybridClock hybridClock,
@@ -181,7 +180,6 @@ public class PartitionReplicaListener implements ReplicaListener {
this.txManager = txManager;
this.lockManager = lockManager;
this.partId = partId;
- this.replicationGroupId = replicationGroupId;
this.tableId = tableId;
this.primaryIndex = primaryIndex;
this.hybridClock = hybridClock;
@@ -192,6 +190,7 @@ public class PartitionReplicaListener implements ReplicaListener {
//TODO: IGNITE-17479 Integrate indexes into replicaListener command handlers
this.indexScanId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 1);
this.indexPkId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 2);
+ this.replicationGroupId = new TablePartitionId(tableId, partId);
cursors = new ConcurrentSkipListMap<>((o1, o2) -> {
if (o1 == o2) {
@@ -511,7 +510,7 @@ public class PartitionReplicaListener implements ReplicaListener {
*/
// TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
private CompletableFuture<Object> processTxFinishAction(TxFinishReplicaRequest request) {
- List<String> aggregatedGroupIds = request.groups().values().stream()
+ List<ReplicationGroupId> aggregatedGroupIds = request.groups().values().stream()
.flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList());
UUID txId = request.txId();
@@ -548,7 +547,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param commit True is the transaction is committed, false otherwise.
* @return Future to wait of the finish.
*/
- private CompletableFuture<Object> finishTransaction(List<String> aggregatedGroupIds, UUID txId, boolean commit) {
+ private CompletableFuture<Object> finishTransaction(List<ReplicationGroupId> aggregatedGroupIds, UUID txId, boolean commit) {
// TODO: IGNITE-17261 Timestamp from request is not using until the issue has not been fixed (request.commitTimestamp())
var fut = new CompletableFuture<TxMeta>();
@@ -662,6 +661,10 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
UUID txId = request.transactionId();
+ TablePartitionId committedPartitionId = request.commitPartitionId();
+
+ assert committedPartitionId != null || request.requestType() == RequestType.RW_GET_ALL
+ : "Commit partition partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_GET_ALL: {
@@ -713,7 +716,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(rowIdsToDelete, txId));
+ : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
return raftFut.thenApply(ignored -> result);
});
@@ -744,7 +747,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(rowIdsToDelete, txId));
+ : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
return raftFut.thenApply(ignored -> result);
});
@@ -779,7 +782,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
CompletableFuture raftFut = rowsToInsert.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(rowsToInsert, txId));
+ : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToInsert, txId));
return raftFut.thenApply(ignored -> result);
});
@@ -809,7 +812,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
CompletableFuture raftFut = rowsToUpdate.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(rowsToUpdate, txId));
+ : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId));
return raftFut.thenApply(ignored -> null);
});
@@ -852,14 +855,17 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Listener response.
*/
private CompletableFuture<Object> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request) {
+ UUID txId = request.transactionId();
BinaryRow searchRow = request.binaryRow();
+ TablePartitionId commitPartitionId = request.commitPartitionId();
+
+ assert commitPartitionId != null || request.requestType() == RequestType.RW_GET :
+ "Commit partition partition is null [type=" + request.requestType() + ']';
ByteBuffer searchKey = searchRow.keySlice();
UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
- UUID txId = request.transactionId();
-
switch (request.requestType()) {
case RW_GET: {
CompletableFuture<RowId> lockFut = takeLocksForGet(searchKey, indexId, txId);
@@ -877,8 +883,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockFut.thenCompose(lockedRowId -> {
boolean removed = lockedRowId != null;
- CompletableFuture raftFut = removed ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
- CompletableFuture.completedFuture(null);
+ CompletableFuture raftFut =
+ removed ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, txId)) :
+ CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> removed);
});
@@ -890,8 +897,9 @@ public class PartitionReplicaListener implements ReplicaListener {
BinaryRow lockedRow = lockedRowId != null
? resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), txId) : null;
- CompletableFuture raftFut = lockedRowId != null ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, txId)) :
- CompletableFuture.completedFuture(null);
+ CompletableFuture raftFut =
+ lockedRowId != null ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, txId))
+ : CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> lockedRow);
});
@@ -902,8 +910,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockFut.thenCompose(lockedRow -> {
boolean removed = lockedRow != null;
- CompletableFuture raftFut = removed ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRow, txId)) :
- CompletableFuture.completedFuture(null);
+ CompletableFuture raftFut =
+ removed ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRow, txId)) :
+ CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> removed);
});
@@ -915,7 +924,8 @@ public class PartitionReplicaListener implements ReplicaListener {
boolean inserted = lockedRowId == null;
CompletableFuture raftFut =
- lockedRowId == null ? applyCmdWithExceptionHandling(new UpdateCommand(new RowId(partId), searchRow, txId)) :
+ lockedRowId == null ? applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId)) :
CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> inserted);
@@ -925,9 +935,9 @@ public class PartitionReplicaListener implements ReplicaListener {
CompletableFuture<RowId> lockFut = takeLocksForUpsert(searchKey, indexId, txId);
return lockFut.thenCompose(lockedRowId -> {
- CompletableFuture raftFut =
- lockedRowId != null ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
- applyCmdWithExceptionHandling(new UpdateCommand(new RowId(partId), searchRow, txId));
+ CompletableFuture raftFut = lockedRowId != null ? applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId)) :
+ applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId));
return raftFut.thenApply(ignored -> null);
});
@@ -948,10 +958,10 @@ public class PartitionReplicaListener implements ReplicaListener {
BinaryRow result = rowId != null
? resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId) : null;
- CompletableFuture raftFut =
- rowId != null ? applyCmdWithExceptionHandling(new UpdateCommand(rowId, searchRow, txId))
- : applyCmdWithExceptionHandling(
- new UpdateCommand(new RowId(partId), searchRow, txId));
+ CompletableFuture raftFut = rowId != null ? applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId, searchRow, txId))
+ : applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId));
return raftFut.thenApply(ignored -> result);
});
@@ -987,7 +997,8 @@ public class PartitionReplicaListener implements ReplicaListener {
return rowLockFut.thenCompose(lockedRow -> {
CompletableFuture raftFut = lockedRow == null ? CompletableFuture.completedFuture(null) :
- applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId));
+ applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId));
return raftFut.thenApply(ignored -> lockedRow);
});
@@ -1000,8 +1011,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockFut.thenCompose(lockedRowId -> {
boolean replaced = lockedRowId != null;
- CompletableFuture raftFut = replaced ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
- CompletableFuture.completedFuture(null);
+ CompletableFuture raftFut =
+ replaced ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId))
+ : CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> replaced);
});
@@ -1196,6 +1208,9 @@ public class PartitionReplicaListener implements ReplicaListener {
private CompletableFuture<Object> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
BinaryRow searchRow = request.binaryRow();
BinaryRow oldRow = request.oldBinaryRow();
+ TablePartitionId commitPartitionId = request.commitPartitionId();
+
+ assert commitPartitionId != null : "Commit partition partition is null [type=" + request.requestType() + ']';
ByteBuffer searchKey = searchRow.keySlice();
@@ -1210,8 +1225,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return lockFut.thenCompose(lockedRowId -> {
boolean replaced = lockedRowId != null;
- CompletableFuture raftFut = replaced ? applyCmdWithExceptionHandling(new UpdateCommand(lockedRowId, searchRow, txId)) :
- CompletableFuture.completedFuture(null);
+ CompletableFuture raftFut =
+ replaced ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId))
+ : CompletableFuture.completedFuture(null);
return raftFut.thenApply(ignored -> replaced);
});
@@ -1396,7 +1412,7 @@ public class PartitionReplicaListener implements ReplicaListener {
HybridTimestamp timestamp,
Supplier<BinaryRow> lastCommitted
) {
- String commitGrpId = partitionRaftGroupName(readResult.commitTableId(), readResult.commitPartitionId());
+ ReplicationGroupId commitGrpId = new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId());
return placementDriver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest()
.groupId(commitGrpId)
@@ -1415,16 +1431,4 @@ public class PartitionReplicaListener implements ReplicaListener {
}
});
}
-
- /**
- * Compounds a RAFT group unique name.
- *
- * @param tblId Table identifier.
- * @param partition Number of table partitions.
- * @return A RAFT group name.
- */
- @NotNull
- private String partitionRaftGroupName(UUID tblId, int partition) {
- return tblId + "_part_" + partition;
- }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
index fc2e920ce8..fc5fecee45 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -33,7 +34,7 @@ import org.apache.ignite.network.ClusterNode;
*/
public class PlacementDriver {
/** Assignment nodes per replication group. */
- private final Map<String, LinkedHashSet<ClusterNode>> primaryReplicaMapping = new ConcurrentHashMap<>();
+ private final Map<ReplicationGroupId, LinkedHashSet<ClusterNode>> primaryReplicaMapping = new ConcurrentHashMap<>();
/** Replication service. */
private final ReplicaService replicaService;
@@ -54,7 +55,7 @@ public class PlacementDriver {
* @param request Status request.
* @return Result future.
*/
- public CompletableFuture<TxMeta> sendMetaRequest(String replicaGrp, TxStateReplicaRequest request) {
+ public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
sendAndRetry(resFut, replicaGrp, request);
@@ -68,7 +69,7 @@ public class PlacementDriver {
* @param replicaGrpId Replication group id.
* @param assignment Assignment.
*/
- public void updateAssignment(String replicaGrpId, Collection<ClusterNode> assignment) {
+ public void updateAssignment(ReplicationGroupId replicaGrpId, Collection<ClusterNode> assignment) {
primaryReplicaMapping.put(replicaGrpId, new LinkedHashSet<>(assignment));
}
@@ -80,7 +81,7 @@ public class PlacementDriver {
* @param replicaGrp Replication group id.
* @param request Request.
*/
- private void sendAndRetry(CompletableFuture<TxMeta> resFut, String replicaGrp, TxStateReplicaRequest request) {
+ private void sendAndRetry(CompletableFuture<TxMeta> resFut, ReplicationGroupId replicaGrp, TxStateReplicaRequest request) {
ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).iterator().next();
replicaService.invoke(nodeToSend, request).thenAccept(resp -> {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
new file mode 100644
index 0000000000..01aeaf3e56
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TablePartitionId.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.UUID;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The class is used to identify a table replication group.
+ */
+public class TablePartitionId implements ReplicationGroupId {
+
+ /** Table id. */
+ private final UUID tableId;
+
+ /** Partition id. */
+ private final int partId;
+
+ /**
+ * The constructor.
+ *
+ * @param tableId Table id.
+ * @param partId Partition id.
+ */
+ public TablePartitionId(@NotNull UUID tableId, int partId) {
+ this.tableId = tableId;
+ this.partId = partId;
+ }
+
+ /**
+ * Gets a pration id.
+ *
+ * @return Partition id.
+ */
+ public int getPartId() {
+ return partId;
+ }
+
+ /**
+ * Gets a table id.
+ *
+ * @return Table id.
+ */
+ public UUID getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TablePartitionId that = (TablePartitionId) o;
+
+ return partId == that.partId && tableId.equals(that.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return tableId.hashCode() ^ partId;
+ }
+
+ @Override
+ public String toString() {
+ return tableId + "_part_" + partId;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 08e8d622db..61b101aaaf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -39,6 +39,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -48,15 +49,16 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteFiveFunction;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteTetraFunction;
-import org.apache.ignite.lang.IgniteTriFunction;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Peer;
@@ -190,7 +192,7 @@ public class InternalTableImpl implements InternalTable {
private <R> CompletableFuture<R> enlistInTx(
BinaryRowEx row,
InternalTransaction tx,
- IgniteTriFunction<InternalTransaction, String, Long, ReplicaRequest> op
+ IgniteTetraFunction<TablePartitionId, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op
) {
final boolean implicit = tx == null;
@@ -198,14 +200,16 @@ public class InternalTableImpl implements InternalTable {
int partId = partId(row);
- String partGroupId = partitionMap.get(partId).groupId();
+ TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = tx0.enlistedNodeAndTerm(partGroupId);
CompletableFuture<R> fut;
if (primaryReplicaAndTerm != null) {
- ReplicaRequest request = op.apply(tx0, partGroupId, primaryReplicaAndTerm.get2());
+ TablePartitionId commitPart = (TablePartitionId) tx.commitPartition();
+
+ ReplicaRequest request = op.apply(commitPart, tx0, partGroupId, primaryReplicaAndTerm.get2());
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -218,7 +222,7 @@ public class InternalTableImpl implements InternalTable {
fut = enlistWithRetry(
tx0,
partId,
- term -> op.apply(tx0, partGroupId, term),
+ (commitPart, term) -> op.apply(commitPart, tx0, partGroupId, term),
ATTEMPTS_TO_ENLIST_PARTITION
);
}
@@ -238,7 +242,7 @@ public class InternalTableImpl implements InternalTable {
private <T> CompletableFuture<T> enlistInTx(
Collection<BinaryRowEx> keyRows,
InternalTransaction tx,
- IgniteTetraFunction<Collection<BinaryRow>, InternalTransaction, String, Long, ReplicaRequest> op,
+ IgniteFiveFunction<TablePartitionId, Collection<BinaryRow>, InternalTransaction, ReplicationGroupId, Long, ReplicaRequest> op,
Function<CompletableFuture<Object>[], CompletableFuture<T>> reducer
) {
final boolean implicit = tx == null;
@@ -257,14 +261,16 @@ public class InternalTableImpl implements InternalTable {
int batchNum = 0;
for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows : keyRowsByPartition.int2ObjectEntrySet()) {
- String partGroupId = partitionMap.get(partToRows.getIntKey()).groupId();
+ TablePartitionId partGroupId = new TablePartitionId(tableId, partToRows.getIntKey());
IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = tx0.enlistedNodeAndTerm(partGroupId);
CompletableFuture<Object> fut;
if (primaryReplicaAndTerm != null) {
- ReplicaRequest request = op.apply(partToRows.getValue(), tx0, partGroupId, primaryReplicaAndTerm.get2());
+ TablePartitionId commitPart = (TablePartitionId) tx.commitPartition();
+
+ ReplicaRequest request = op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, primaryReplicaAndTerm.get2());
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -277,7 +283,7 @@ public class InternalTableImpl implements InternalTable {
fut = enlistWithRetry(
tx0,
partToRows.getIntKey(),
- term -> op.apply(partToRows.getValue(), tx0, partGroupId, term),
+ (commitPart, term) -> op.apply(commitPart, partToRows.getValue(), tx0, partGroupId, term),
ATTEMPTS_TO_ENLIST_PARTITION
);
}
@@ -305,7 +311,7 @@ public class InternalTableImpl implements InternalTable {
long scanId,
int batchSize
) {
- String partGroupId = partitionMap.get(partId).groupId();
+ TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = tx.enlistedNodeAndTerm(partGroupId);
@@ -329,7 +335,7 @@ public class InternalTableImpl implements InternalTable {
throw new TransactionException("Failed to invoke the replica request.");
}
} else {
- fut = enlistWithRetry(tx, partId, term -> requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
+ fut = enlistWithRetry(tx, partId, (commitPart, term) -> requestBuilder.term(term).build(), ATTEMPTS_TO_ENLIST_PARTITION);
}
return postEnlist(fut, false, tx);
@@ -347,7 +353,7 @@ public class InternalTableImpl implements InternalTable {
private <R> CompletableFuture<R> enlistWithRetry(
InternalTransaction tx,
int partId,
- Function<Long, ReplicaRequest> requestFunction,
+ BiFunction<TablePartitionId, Long, ReplicaRequest> requestFunction,
int attempts
) {
CompletableFuture<R> result = new CompletableFuture();
@@ -357,7 +363,7 @@ public class InternalTableImpl implements InternalTable {
try {
return replicaSvc.invoke(
primaryReplicaAndTerm.get1(),
- requestFunction.apply(primaryReplicaAndTerm.get2())
+ requestFunction.apply((TablePartitionId) tx.commitPartition(), primaryReplicaAndTerm.get2())
);
} catch (PrimaryReplicaMissException e) {
throw new TransactionException(e);
@@ -428,7 +434,7 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRow,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.binaryRow(keyRow)
.transactionId(txo.id())
@@ -445,7 +451,7 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRows,
tx,
- (keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.binaryRows(keyRows0)
.transactionId(txo.id())
@@ -462,8 +468,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -478,8 +485,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRows(keyRows0)
.transactionId(txo.id())
.term(term)
@@ -495,8 +503,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -512,8 +521,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -529,8 +539,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRows(keyRows0)
.transactionId(txo.id())
.term(term)
@@ -546,8 +557,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -563,8 +575,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
newRow,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSwapRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.oldBinaryRow(oldRow)
.binaryRow(newRow)
.transactionId(txo.id())
@@ -581,8 +594,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -598,8 +612,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRow,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(keyRow)
.transactionId(txo.id())
.term(term)
@@ -615,8 +630,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
oldRow,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(oldRow)
.transactionId(txo.id())
.term(term)
@@ -632,8 +648,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRow(row)
.transactionId(txo.id())
.term(term)
@@ -649,8 +666,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRows(keyRows0)
.transactionId(txo.id())
.term(term)
@@ -669,8 +687,9 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
+ (commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
+ .commitPartitionId(commitPart)
.binaryRows(keyRows0)
.transactionId(txo.id())
.term(term)
@@ -849,6 +868,7 @@ public class InternalTableImpl implements InternalTable {
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int partId, InternalTransaction tx) {
RaftGroupService svc = partitionMap.get(partId);
+ tx.assignCommitPartition(new TablePartitionId(tableId, partId));
// TODO: IGNITE-17256 Use a placement driver for getting a primary replica.
CompletableFuture<IgniteBiTuple<Peer, Long>> fut0 = svc.refreshAndGetLeaderWithTerm();
@@ -860,7 +880,9 @@ public class InternalTableImpl implements InternalTable {
throw new TransactionException("Failed to get the primary replica.");
}
- return tx.enlist(svc.groupId(),
+ TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
+
+ return tx.enlist(partGroupId,
new IgniteBiTuple<>(clusterNodeResolver.apply(primaryPeerAndTerm.get1().address()), primaryPeerAndTerm.get2()));
});
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 3ce5cd1f03..9d00139d07 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.If;
import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
@@ -94,7 +95,7 @@ public class RebalanceUtil {
* @return Future representing result of updating keys in {@code metaStorageMgr}
*/
public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys(
- String tableName, String partId, Collection<ClusterNode> baselineNodes,
+ String tableName, TablePartitionId partId, Collection<ClusterNode> baselineNodes,
int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
@@ -186,7 +187,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray partChangeTriggerKey(String partId) {
+ public static ByteArray partChangeTriggerKey(TablePartitionId partId) {
return new ByteArray(partId + ".change.trigger");
}
@@ -197,7 +198,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray pendingPartAssignmentsKey(String partId) {
+ public static ByteArray pendingPartAssignmentsKey(TablePartitionId partId) {
return new ByteArray(PENDING_ASSIGNMENTS_PREFIX + partId);
}
@@ -208,7 +209,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray plannedPartAssignmentsKey(String partId) {
+ public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId) {
return new ByteArray("assignments.planned." + partId);
}
@@ -219,7 +220,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray stablePartAssignmentsKey(String partId) {
+ public static ByteArray stablePartAssignmentsKey(TablePartitionId partId) {
return new ByteArray(STABLE_ASSIGNMENTS_PREFIX + partId);
}
@@ -230,7 +231,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray switchReduceKey(String partId) {
+ public static ByteArray switchReduceKey(TablePartitionId partId) {
return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
}
@@ -241,7 +242,7 @@ public class RebalanceUtil {
* @return Key for a partition.
* @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
*/
- public static ByteArray switchAppendKey(String partId) {
+ public static ByteArray switchAppendKey(TablePartitionId partId) {
return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
}
@@ -300,7 +301,11 @@ public class RebalanceUtil {
* @param metaStorageMgr MetaStorage manager.
* @return Completable future that signifies the completion of this operation.
*/
- public static CompletableFuture<Void> startPeerRemoval(String partId, ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+ public static CompletableFuture<Void> startPeerRemoval(
+ TablePartitionId partId,
+ ClusterNode clusterNode,
+ MetaStorageManager metaStorageMgr
+ ) {
ByteArray key = switchReduceKey(partId);
return metaStorageMgr.get(key)
@@ -339,7 +344,7 @@ public class RebalanceUtil {
/**
* Handles assignments switch reduce changed updating pending assignments if there is no rebalancing in progress.
- * If there is rebalancing in progress, then new assignments will be applied when rebalance finishes.
+ * If there is rebalancing in progress, then new assignments will be applied when rebalance finishes.
*
* @param metaStorageMgr MetaStorage manager.
* @param baselineNodes Baseline nodes.
@@ -350,7 +355,7 @@ public class RebalanceUtil {
* @return Completable future that signifies the completion of this operation.
*/
public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> baselineNodes,
- int replicas, int partNum, String partId, WatchEvent event) {
+ int replicas, int partNum, TablePartitionId partId, WatchEvent event) {
Entry entry = event.entryEvent().newEntry();
byte[] eventData = entry.value();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 49861338b7..08c67303b1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -99,10 +99,10 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
);
/** Accounts table id -> balance. */
- protected Table accounts;
+ protected TableImpl accounts;
/** Customers table id -> name. */
- protected Table customers;
+ protected TableImpl customers;
protected static final double BALANCE_1 = 500;
@@ -1730,7 +1730,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
* @param partId Partition id.
* @return {@code True} if a replicas are the same.
*/
- protected abstract boolean assertPartitionsSame(Table table, int partId);
+ protected abstract boolean assertPartitionsSame(TableImpl table, int partId);
/**
* Validates balances.
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index bdea888ab8..c5fa5638f9 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -64,7 +65,7 @@ public class TxLocalTest extends TxAbstractTest {
ReplicaService replicaSvc = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
- Map<String, DummyInternalTableImpl> tables = new HashMap<>();
+ Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>();
lenient().doAnswer(
invocationOnMock -> {
@@ -111,7 +112,7 @@ public class TxLocalTest extends TxAbstractTest {
}
@Override
- protected boolean assertPartitionsSame(Table table, int partId) {
+ protected boolean assertPartitionsSame(TableImpl table, int partId) {
return true;
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1d02422b16..91e59b116b 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -35,7 +35,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -98,6 +97,7 @@ import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageChange;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -441,14 +441,14 @@ public class TableManagerTest extends IgniteAbstractTest {
mockManagersAndCreateTable(scmTbl, tblManagerFut);
- verify(rm, times(PARTITIONS)).startRaftGroupService(anyString(), any());
+ verify(rm, times(PARTITIONS)).startRaftGroupService(any(), any());
TableManager tableManager = tblManagerFut.join();
tableManager.stop();
- verify(rm, times(PARTITIONS)).stopRaftGroup(anyString());
- verify(replicaMgr, times(PARTITIONS)).stopReplica(anyString());
+ verify(rm, times(PARTITIONS)).stopRaftGroup(any());
+ verify(replicaMgr, times(PARTITIONS)).stopReplica(any());
}
/**
@@ -655,7 +655,7 @@ public class TableManagerTest extends IgniteAbstractTest {
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
- String groupId = "test";
+ TablePartitionId groupId = new TablePartitionId(UUID.randomUUID(), 0);
List<String> shrunkPeers = peersToIds(nodes.subList(0, 1));
@@ -669,7 +669,7 @@ public class TableManagerTest extends IgniteAbstractTest {
eq(factory.changePeersAsyncRequest()
.newPeersList(shrunkPeers)
.term(1L)
- .groupId(groupId).build()), anyLong()))
+ .groupId(groupId.toString()).build()), anyLong()))
.then(invocation -> {
if (firstInvocationOfChangePeersAsync.get() == 0) {
firstInvocationOfChangePeersAsync.set(System.currentTimeMillis());
@@ -710,7 +710,7 @@ public class TableManagerTest extends IgniteAbstractTest {
eq(factory.changePeersAsyncRequest()
.newPeersList(shrunkPeers)
.term(1L)
- .groupId(groupId).build()), anyLong()))
+ .groupId(groupId.toString()).build()), anyLong()))
.then(invocation -> {
if (secondInvocationOfChangePeersAsync.get() == 0) {
secondInvocationOfChangePeersAsync.set(System.currentTimeMillis());
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 4ead758d00..d828c4c80e 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
@@ -71,7 +73,12 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testUpdateCommand() throws Exception {
- UpdateCommand cmd = new UpdateCommand(new RowId(1), binaryRow(1), UUID.randomUUID());
+ UpdateCommand cmd = new UpdateCommand(
+ new TablePartitionId(UUID.randomUUID(), 1),
+ new RowId(1),
+ binaryRow(1),
+ UUID.randomUUID()
+ );
UpdateCommand readCmd = copyCommand(cmd);
@@ -82,7 +89,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testRemoveCommand() throws Exception {
- UpdateCommand cmd = new UpdateCommand(new RowId(1), UUID.randomUUID());
+ UpdateCommand cmd = new UpdateCommand(new TablePartitionId(UUID.randomUUID(), 1), new RowId(1), UUID.randomUUID());
UpdateCommand readCmd = copyCommand(cmd);
@@ -99,7 +106,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
rowsToUpdate.put(new RowId(i), binaryRow(i));
}
- var cmd = new UpdateAllCommand(rowsToUpdate, UUID.randomUUID());
+ var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 1), rowsToUpdate, UUID.randomUUID());
UpdateAllCommand readCmd = copyCommand(cmd);
@@ -123,7 +130,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
rowsToRemove.add(new RowId(i));
}
- var cmd = new UpdateAllCommand(rowsToRemove, UUID.randomUUID());
+ var cmd = new UpdateAllCommand(new TablePartitionId(UUID.randomUUID(), 1), rowsToRemove, UUID.randomUUID());
UpdateAllCommand readCmd = copyCommand(cmd);
@@ -151,10 +158,10 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testFinishTxCommand() throws Exception {
HybridClock clock = new HybridClock();
- ArrayList<String> grps = new ArrayList<String>(10);
+ ArrayList<ReplicationGroupId> grps = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
- grps.add("grp-" + i);
+ grps.add(new TablePartitionId(UUID.randomUUID(), i));
}
FinishTxCommand cmd = new FinishTxCommand(UUID.randomUUID(), true, clock.now(), grps);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index a19307b866..ee1df2e071 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -267,6 +268,7 @@ public class PartitionCommandListenerTest {
HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, i);
@@ -276,7 +278,7 @@ public class PartitionCommandListenerTest {
txs.add(new IgniteBiTuple<>(row, txId));
}
- when(clo.command()).thenReturn(new UpdateAllCommand(rows, txId));
+ when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, rows, txId));
}));
txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
@@ -302,6 +304,7 @@ public class PartitionCommandListenerTest {
HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, keyValueMapper.apply(i));
@@ -311,7 +314,7 @@ public class PartitionCommandListenerTest {
txs.add(new IgniteBiTuple<>(row, txId));
}
- when(clo.command()).thenReturn(new UpdateAllCommand(rows, txId));
+ when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, rows, txId));
}));
txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
@@ -335,6 +338,7 @@ public class PartitionCommandListenerTest {
Set<RowId> keyRows = new HashSet<>(KEY_COUNT);
UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
Row row = getTestRow(i, i);
@@ -344,7 +348,7 @@ public class PartitionCommandListenerTest {
txs.add(new IgniteBiTuple<>(row, txId));
}
- when(clo.command()).thenReturn(new UpdateAllCommand(keyRows, txId));
+ when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, keyRows, txId));
}));
txs.forEach(
@@ -363,6 +367,7 @@ public class PartitionCommandListenerTest {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, keyValueMapper.apply(i));
RowId rowId = primaryIndex.get(row.keySlice());
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
assertNotNull(rowId);
@@ -370,7 +375,7 @@ public class PartitionCommandListenerTest {
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
- when(clo.command()).thenReturn(new UpdateCommand(rowId, row, txId));
+ when(clo.command()).thenReturn(new UpdateCommand(commitPartId, rowId, row, txId));
doAnswer(invocation -> {
assertNull(invocation.getArgument(0));
@@ -392,6 +397,7 @@ public class PartitionCommandListenerTest {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, i);
RowId rowId = primaryIndex.get(row.keySlice());
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
assertNotNull(rowId);
@@ -399,7 +405,7 @@ public class PartitionCommandListenerTest {
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
- when(clo.command()).thenReturn(new UpdateCommand(rowId, txId));
+ when(clo.command()).thenReturn(new UpdateCommand(commitPartId, rowId, txId));
doAnswer(invocation -> {
assertNull(invocation.getArgument(0));
@@ -455,11 +461,12 @@ public class PartitionCommandListenerTest {
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, i);
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
txs.add(new IgniteBiTuple<>(row, txId));
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
- when(clo.command()).thenReturn(new UpdateCommand(new RowId(PARTITION_ID), row, txId));
+ when(clo.command()).thenReturn(new UpdateCommand(commitPartId, new RowId(PARTITION_ID), row, txId));
doAnswer(invocation -> {
assertNull(invocation.getArgument(0));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index f25659b24a..dbf62d6d59 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -49,6 +50,7 @@ import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -87,7 +89,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final UUID tblId = UUID.randomUUID();
/** Replication group id. */
- private static final String grpId = tblId + "_part_" + partId;
+ private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
/** Primary index map. */
private static final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
@@ -179,7 +181,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
mock(TxManager.class),
new HeapLockManager(),
partId,
- grpId,
tblId,
primaryIndex,
clock,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 9869656427..f30ac90119 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
@@ -68,11 +70,13 @@ import org.mockito.Mockito;
public class DummyInternalTableImpl extends InternalTableImpl {
public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1", 2004);
+ private static final ReplicationGroupId crossTableGroupId = new TablePartitionId(UUID.randomUUID(), 0);
+
private PartitionListener partitionListener;
private ReplicaListener replicaListener;
- private String groupId;
+ private ReplicationGroupId groupId;
/**
* Creates a new local table.
@@ -135,7 +139,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
);
RaftGroupService svc = partitionMap.get(0);
- groupId = crossTableUsage ? "testGrp-" + UUID.randomUUID() : "testGrp";
+ groupId = crossTableUsage ? new TablePartitionId(tableId(), 0) : crossTableGroupId;
lenient().doReturn(groupId).when(svc).groupId();
Peer leaderPeer = new Peer(ADDR);
@@ -206,7 +210,6 @@ public class DummyInternalTableImpl extends InternalTableImpl {
this.txManager,
this.txManager.lockManager(),
0,
- groupId,
tableId(),
primaryIndex,
new HybridClock(),
@@ -233,18 +236,12 @@ public class DummyInternalTableImpl extends InternalTableImpl {
return replicaListener;
}
- /** {@inheritDoc} */
- @Override
- public @NotNull UUID tableId() {
- return UUID.randomUUID();
- }
-
/**
* Group id of single partition of this table.
*
* @return Group id.
*/
- public String groupId() {
+ public ReplicationGroupId groupId() {
return groupId;
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index b6a5da7882..0baefbbc33 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.tx;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.Transaction;
@@ -38,12 +39,12 @@ public interface InternalTransaction extends Transaction {
/**
* Returns enlisted primary replica node associated with given replication group.
*
- * @param partGroupId Replication group id.
+ * @param replicationGroupId Table partition id.
* @return Enlisted primary replica node and raft term associated with given replication group.
*/
// TODO: IGNITE-17256 IgniteBiTuple along with second parameter term will be removed after introducing leased based primary replica
// TODO: selection and failover engine.
- IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(String partGroupId);
+ IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId replicationGroupId);
/**
* Returns a transaction state.
@@ -52,14 +53,29 @@ public interface InternalTransaction extends Transaction {
*/
TxState state();
+ /**
+ * Assigns a replication group id to store the transaction state.
+ *
+ * @param replicationGroupId Commit partition group id.
+ * @return True if the replication group was assigned as committed, false otherwise.
+ */
+ boolean assignCommitPartition(ReplicationGroupId replicationGroupId);
+
+ /**
+ * Gets a replication group id that stores the transaction state.
+ *
+ * @return Replication group id.
+ */
+ ReplicationGroupId commitPartition();
+
/**
* Enlists a partition group into a transaction.
*
- * @param replicationGroupId Replication group id to enlist.
+ * @param replicationGroupId Table partition id to enlist.
* @param nodeAndTerm Primary replica cluster node and raft term to enlist for given replication group.
* @return {@code True} if a partition is enlisted into the transaction.
*/
- IgniteBiTuple<ClusterNode, Long> enlist(String replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm);
+ IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm);
/**
* Enlists operation future in transaction. It's used in order to wait corresponding tx operations before commit.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 992ff77bd6..cbacf446d2 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -73,6 +74,7 @@ public interface TxManager extends IgniteComponent {
/**
* Finishes a dependant transactions.
*
+ * @param commitPartition Partition to store a transaction state.
* @param recipientNode Recipient node.
* @param term Raft term.
* @param commit {@code True} if a commit requested.
@@ -80,10 +82,11 @@ public interface TxManager extends IgniteComponent {
* @param txId Transaction id.
*/
CompletableFuture<Void> finish(
+ ReplicationGroupId commitPartition,
ClusterNode recipientNode,
Long term,
boolean commit,
- Map<ClusterNode, List<IgniteBiTuple<String, Long>>> groups,
+ Map<ClusterNode, List<IgniteBiTuple<ReplicationGroupId, Long>>> groups,
UUID txId
);
@@ -99,7 +102,7 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Void> cleanup(
ClusterNode recipientNode,
- List<IgniteBiTuple<String, Long>> replicationGroupIds,
+ List<IgniteBiTuple<ReplicationGroupId, Long>> replicationGroupIds,
UUID txId,
boolean commit,
HybridTimestamp commitTimestamp
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
index 4680d8feb5..450c868885 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java
@@ -22,6 +22,7 @@ import static java.util.Collections.unmodifiableList;
import java.io.Serializable;
import java.util.List;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
/** Transaction meta. */
@@ -33,7 +34,7 @@ public class TxMeta implements Serializable {
private final TxState txState;
/** The list of enlisted partitions. */
- private final List<String> enlistedPartitions;
+ private final List<ReplicationGroupId> enlistedPartitions;
/** Commit timestamp. */
private final HybridTimestamp commitTimestamp;
@@ -45,7 +46,7 @@ public class TxMeta implements Serializable {
* @param enlistedPartitions The list of enlisted partitions.
* @param commitTimestamp Commit timestamp.
*/
- public TxMeta(TxState txState, List<String> enlistedPartitions, HybridTimestamp commitTimestamp) {
+ public TxMeta(TxState txState, List<ReplicationGroupId> enlistedPartitions, HybridTimestamp commitTimestamp) {
this.txState = txState;
this.enlistedPartitions = enlistedPartitions;
this.commitTimestamp = commitTimestamp;
@@ -55,7 +56,7 @@ public class TxMeta implements Serializable {
return txState;
}
- public List<String> enlistedPartitions() {
+ public List<ReplicationGroupId> enlistedPartitions() {
return unmodifiableList(enlistedPartitions);
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionImpl.java
index 150a241e91..e740c8428d 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionImpl.java
@@ -27,10 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -55,11 +57,14 @@ public class TransactionImpl implements InternalTransaction {
private final TxManager txManager;
/** Enlisted replication groups: replication group id -> (primary replica node, raft term). */
- private final Map<String, IgniteBiTuple<ClusterNode, Long>> enlisted = new ConcurrentSkipListMap<>();
+ private final Map<ReplicationGroupId, IgniteBiTuple<ClusterNode, Long>> enlisted = new ConcurrentHashMap<>();
/** Enlisted operation futures in this transaction. */
private final List<CompletableFuture<?>> enlistedResults = new CopyOnWriteArrayList<>();
+ /** Reference to the partition that stores the transaction state. */
+ private final AtomicReference<ReplicationGroupId> commitPartitionRef = new AtomicReference<>();
+
/**
* The constructor.
*
@@ -71,6 +76,18 @@ public class TransactionImpl implements InternalTransaction {
this.id = id;
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean assignCommitPartition(ReplicationGroupId replicationGroupId) {
+ return commitPartitionRef.compareAndSet(null, replicationGroupId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ReplicationGroupId commitPartition() {
+ return commitPartitionRef.get();
+ }
+
/** {@inheritDoc} */
@NotNull
@Override
@@ -80,7 +97,7 @@ public class TransactionImpl implements InternalTransaction {
/** {@inheritDoc} */
@Override
- public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(String partGroupId) {
+ public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(ReplicationGroupId partGroupId) {
return enlisted.get(partGroupId);
}
@@ -93,7 +110,7 @@ public class TransactionImpl implements InternalTransaction {
/** {@inheritDoc} */
@Override
- public IgniteBiTuple<ClusterNode, Long> enlist(String replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
enlisted.put(replicationGroupId, nodeAndTerm);
return nodeAndTerm;
@@ -144,7 +161,7 @@ public class TransactionImpl implements InternalTransaction {
.thenCompose(
ignored -> {
if (!enlisted.isEmpty()) {
- Map<ClusterNode, List<IgniteBiTuple<String, Long>>> groups = new LinkedHashMap<>();
+ Map<ClusterNode, List<IgniteBiTuple<ReplicationGroupId, Long>>> groups = new LinkedHashMap<>();
enlisted.forEach((groupId, groupMeta) -> {
ClusterNode recipientNode = groupMeta.get1();
@@ -152,7 +169,7 @@ public class TransactionImpl implements InternalTransaction {
if (groups.containsKey(recipientNode)) {
groups.get(recipientNode).add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
} else {
- List<IgniteBiTuple<String, Long>> items = new ArrayList<>();
+ List<IgniteBiTuple<ReplicationGroupId, Long>> items = new ArrayList<>();
items.add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
@@ -160,13 +177,15 @@ public class TransactionImpl implements InternalTransaction {
}
});
- ClusterNode recipientNode = enlisted.entrySet().iterator().next().getValue().get1();
- Long term = enlisted.entrySet().iterator().next().getValue().get2();
+ ReplicationGroupId commitPart = commitPartitionRef.get();
+ ClusterNode recipientNode = enlisted.get(commitPart).get1();
+ Long term = enlisted.get(commitPart).get2();
- LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, groups={}",
- recipientNode, term, commit, id, groups);
+ LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, groups={} commitPart={}",
+ recipientNode, term, commit, id, groups, commitPart);
return txManager.finish(
+ commitPart,
recipientNode,
term,
commit,
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index de5a1fddb2..8e187438de 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.Timestamp;
@@ -102,10 +103,11 @@ public class TxManagerImpl implements TxManager {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> finish(
+ ReplicationGroupId commitPartition,
ClusterNode recipientNode,
Long term,
boolean commit,
- Map<ClusterNode, List<IgniteBiTuple<String, Long>>> groups,
+ Map<ClusterNode, List<IgniteBiTuple<ReplicationGroupId, Long>>> groups,
UUID txId
) {
assert groups != null && !groups.isEmpty();
@@ -114,7 +116,7 @@ public class TxManagerImpl implements TxManager {
TxFinishReplicaRequest req = FACTORY.txFinishReplicaRequest()
.txId(txId)
- .groupId(groups.values().iterator().next().get(0).get1())
+ .groupId(commitPartition)
.groups(groups)
.commit(commit)
.commitTimestamp(commitTimestamp)
@@ -130,7 +132,7 @@ public class TxManagerImpl implements TxManager {
@Override
public CompletableFuture<Void> cleanup(
ClusterNode recipientNode,
- List<IgniteBiTuple<String, Long>> replicationGroupIds,
+ List<IgniteBiTuple<ReplicationGroupId, Long>> replicationGroupIds,
UUID txId,
boolean commit,
HybridTimestamp commitTimestamp
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index d604a93a46..c1bcee3451 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -66,7 +67,7 @@ public interface TxFinishReplicaRequest extends ReplicaRequest, TimestampAware {
* @return Enlisted partition groups aggregated by expected primary replica nodes.
*/
@Marshallable
- Map<ClusterNode, List<IgniteBiTuple<String, Long>>> groups();
+ Map<ClusterNode, List<IgniteBiTuple<ReplicationGroupId, Long>>> groups();
/**
* Gets a raft term.
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 63e38565f9..907335cbd5 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -23,9 +23,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
+import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
@@ -83,7 +85,7 @@ public class TxManagerTest extends IgniteAbstractTest {
InternalTransaction tx = txManager.begin();
- String replicationGroupName = "ReplicationGroupName";
+ ReplicationGroupId replicationGroupName = new TestReplicationGroupId(1);
ClusterNode node = Mockito.mock(ClusterNode.class);
@@ -106,4 +108,43 @@ public class TxManagerTest extends IgniteAbstractTest {
assertTrue(txId3.compareTo(txId2) > 0);
assertTrue(txId4.compareTo(txId3) > 0);
}
+
+ /**
+ * Test implementation of replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ /** Partition id. */
+ private final int prtId;
+
+ /**
+ * The constructor.
+ *
+ * @param prtId Partition id.
+ */
+ public TestReplicationGroupId(int prtId) {
+ this.prtId = prtId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return prtId == that.prtId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prtId);
+ }
+
+ @Override
+ public String toString() {
+ return "part_" + prtId;
+ }
+ }
}
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageAbstractTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageAbstractTest.java
index 7d68b1e3b6..2361df443d 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageAbstractTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageAbstractTest.java
@@ -29,9 +29,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.TxMeta;
@@ -89,8 +91,8 @@ public abstract class TxStateStorageAbstractTest {
}
}
- private List<String> generateEnlistedPartitions(int c) {
- return IntStream.range(0, c).mapToObj(String::valueOf).collect(toList());
+ private List<ReplicationGroupId> generateEnlistedPartitions(int c) {
+ return IntStream.range(0, c).mapToObj(i -> new TestReplicationGroupId(i)).collect(toList());
}
private HybridTimestamp generateTimestamp(UUID uuid) {
@@ -216,4 +218,43 @@ public abstract class TxStateStorageAbstractTest {
* @return Tx state storage.
*/
protected abstract TxStateTableStorage createStorage();
+
+ /**
+ * Test implementation of replication group id.
+ */
+ private static class TestReplicationGroupId implements ReplicationGroupId {
+ /** Partition id. */
+ private final int prtId;
+
+ /**
+ * The constructor.
+ *
+ * @param prtId Partition id.
+ */
+ public TestReplicationGroupId(int prtId) {
+ this.prtId = prtId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestReplicationGroupId that = (TestReplicationGroupId) o;
+ return prtId == that.prtId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(prtId);
+ }
+
+ @Override
+ public String toString() {
+ return "part_" + prtId;
+ }
+ }
}