You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/12/12 12:15:39 UTC
(ignite-3) branch main updated: IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 492f01b642 IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)
492f01b642 is described below
commit 492f01b6423c3514cbe4b3ac45b3d84e70080318
Author: Cyrill <cy...@gmail.com>
AuthorDate: Tue Dec 12 15:15:33 2023 +0300
IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)
---
...pologyService.java => ClusterNodeResolver.java} | 37 ++++-----------
.../org/apache/ignite/network/TopologyService.java | 18 +-------
.../ignite/network/SingleClusterNodeResolver.java | 45 ++++++++++++++++++
modules/sql-engine/build.gradle | 1 +
.../exec/rel/TableScanNodeExecutionTest.java | 3 +-
modules/table/build.gradle | 3 ++
.../ignite/internal/table/ItColocationTest.java | 3 +-
.../PartitionReplicatorNodeRecovery.java | 12 ++---
.../internal/table/distributed/TableManager.java | 11 +----
.../replicator/TransactionStateResolver.java | 18 +++-----
.../distributed/storage/InternalTableImpl.java | 15 +++---
.../replication/PartitionReplicaListenerTest.java | 16 ++++++-
.../distributed/storage/InternalTableImplTest.java | 5 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 54 ++++++++++++----------
.../table/impl/DummyInternalTableImpl.java | 3 +-
15 files changed, 130 insertions(+), 114 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
similarity index 57%
copy from modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
copy to modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
index 13b647e92d..773b369b1d 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
@@ -17,42 +17,23 @@
package org.apache.ignite.network;
-import java.util.Collection;
import org.jetbrains.annotations.Nullable;
/**
- * Entry point for obtaining physical cluster topology information.
+ * A node resolver.
*/
-// TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService {
+public interface ClusterNodeResolver {
/**
- * Returns information about the current node.
+ * Returns a cluster node consistent ID by its node ID.
*
- * @return Information about the local network member.
- */
- ClusterNode localMember();
-
- /**
- * Returns a list of all discovered cluster members, including the local member itself.
- *
- * @return List of the discovered cluster members.
- */
- Collection<ClusterNode> allMembers();
-
- /**
- * Registers a handler for physical topology change events.
- *
- * @param handler Physical topology event handler.
+ * @param id Node ID.
+ * @return The consistent ID; {@code null} if the node has not been discovered or is offline.
*/
- void addEventHandler(TopologyEventHandler handler);
+ default @Nullable String getConsistentIdById(String id) {
+ ClusterNode node = getById(id);
- /**
- * Returns a cluster node specified by its network address in the 'host:port' format.
- *
- * @param addr The network address.
- * @return The node object; {@code null} if the node has not been discovered or is offline.
- */
- @Nullable ClusterNode getByAddress(NetworkAddress addr);
+ return node != null ? node.name() : null;
+ }
/**
* Returns a cluster node specified by its consistent ID.
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index 13b647e92d..14cf9147fe 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable;
* Entry point for obtaining physical cluster topology information.
*/
// TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService {
+public interface TopologyService extends ClusterNodeResolver {
/**
* Returns information about the current node.
*
@@ -53,20 +53,4 @@ public interface TopologyService {
* @return The node object; {@code null} if the node has not been discovered or is offline.
*/
@Nullable ClusterNode getByAddress(NetworkAddress addr);
-
- /**
- * Returns a cluster node specified by its consistent ID.
- *
- * @param consistentId Consistent ID.
- * @return The node object; {@code null} if the node has not been discovered or is offline.
- */
- @Nullable ClusterNode getByConsistentId(String consistentId);
-
- /**
- * Returns a cluster node specified by its ID.
- *
- * @param id Node ID.
- * @return The node object; {@code null} if the node has not been discovered or is offline.
- */
- @Nullable ClusterNode getById(String id);
}
diff --git a/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java b/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java
new file mode 100644
index 0000000000..57b744f9aa
--- /dev/null
+++ b/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+/**
+ * A class that returns a single {@link ClusterNode} for every request.
+ */
+public class SingleClusterNodeResolver implements ClusterNodeResolver {
+
+ private final ClusterNode clusterNode;
+
+ /**
+ * Constructor.
+ *
+ * @param clusterNode Default cluster node that will be returned as a result of all method calls.
+ */
+ public SingleClusterNodeResolver(ClusterNode clusterNode) {
+ this.clusterNode = clusterNode;
+ }
+
+ @Override
+ public ClusterNode getByConsistentId(String consistentId) {
+ return clusterNode;
+ }
+
+ @Override
+ public ClusterNode getById(String id) {
+ return clusterNode;
+ }
+}
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 57d906a1a5..d80df9f27f 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -84,6 +84,7 @@ dependencies {
testImplementation project(':ignite-vault')
testImplementation project(':ignite-placement-driver')
testImplementation libs.jmh.core
+ testImplementation(testFixtures(project(':ignite-api')))
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 29497d3a3a..66c3761a8c 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -74,6 +74,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.SingleClusterNodeResolver;
import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -207,7 +208,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]>
1,
Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
PART_CNT,
- addr -> mock(ClusterNode.class),
+ new SingleClusterNodeResolver(mock(ClusterNode.class)),
txManager,
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 891d7e396e..55382658cd 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -60,6 +60,7 @@ dependencies {
testImplementation project(':ignite-storage-rocksdb')
testImplementation project(':ignite-placement-driver-api')
testImplementation project(':ignite-system-view-api')
+ testImplementation(testFixtures(project(':ignite-api')))
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-configuration')))
@@ -94,6 +95,7 @@ dependencies {
testFixturesImplementation project(':ignite-affinity')
testFixturesImplementation project(':ignite-configuration-api')
testFixturesImplementation(testFixtures(project(':ignite-configuration')))
+ testFixturesImplementation(testFixtures(project(':ignite-api')))
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation(testFixtures(project(':ignite-storage-api')))
testFixturesImplementation(testFixtures(project(':ignite-transactions')))
@@ -122,6 +124,7 @@ dependencies {
integrationTestImplementation project(':ignite-page-memory')
integrationTestImplementation project(':ignite-storage-page-memory')
integrationTestImplementation(testFixtures(project))
+ integrationTestImplementation(testFixtures(project(':ignite-api')))
integrationTestImplementation(testFixtures(project(':ignite-core')))
integrationTestImplementation(testFixtures(project(':ignite-table')))
integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 90b1839ea1..fec00c73a3 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -103,6 +103,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.SingleClusterNodeResolver;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -266,7 +267,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
tblId,
partRafts,
PARTS,
- name -> clusterNode,
+ new SingleClusterNodeResolver(clusterNode),
txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index a04956ff0a..06feb00559 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
import java.util.function.IntFunction;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -71,9 +70,6 @@ class PartitionReplicatorNodeRecovery {
private final TopologyService topologyService;
- /** Resolver that resolves a node consistent ID to cluster node. */
- private final Function<String, ClusterNode> clusterNodeResolver;
-
/** Obtains a TableImpl instance by a table ID. */
private final IntFunction<TableViewInternal> tableById;
@@ -81,13 +77,11 @@ class PartitionReplicatorNodeRecovery {
MetaStorageManager metaStorageManager,
MessagingService messagingService,
TopologyService topologyService,
- Function<String, ClusterNode> clusterNodeResolver,
IntFunction<TableViewInternal> tableById
) {
this.metaStorageManager = metaStorageManager;
this.messagingService = messagingService;
this.topologyService = topologyService;
- this.clusterNodeResolver = clusterNodeResolver;
this.tableById = tableById;
}
@@ -216,7 +210,7 @@ class PartitionReplicatorNodeRecovery {
Map<String, ClusterNode> peerNodesByConsistentIds = new ConcurrentHashMap<>();
for (Peer peer : peers) {
- ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+ ClusterNode node = topologyService.getByConsistentId(peer.consistentId());
if (node != null) {
peerNodesByConsistentIds.put(peer.consistentId(), node);
@@ -247,7 +241,7 @@ class PartitionReplicatorNodeRecovery {
// Check again for peers that could appear in the topology since last check, but before we installed the handler.
for (Peer peer : peers) {
if (!peerNodesByConsistentIds.containsKey(peer.consistentId())) {
- ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+ ClusterNode node = topologyService.getByConsistentId(peer.consistentId());
if (node != null) {
peerNodesByConsistentIds.put(peer.consistentId(), node);
@@ -284,7 +278,7 @@ class PartitionReplicatorNodeRecovery {
//noinspection unchecked
CompletableFuture<Boolean>[] requestFutures = peers.stream()
.map(Peer::consistentId)
- .map(clusterNodeResolver)
+ .map(topologyService::getByConsistentId)
.filter(Objects::nonNull)
.map(node -> messagingService
.invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b46d5fca24..b5195f854b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -263,9 +263,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
/** Started tables. */
private final Map<Integer, TableImpl> startedTables = new ConcurrentHashMap<>();
- /** Resolver that resolves a node consistent ID to cluster node. */
- private final Function<String, ClusterNode> clusterNodeResolver;
-
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -414,14 +411,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
TopologyService topologyService = clusterService.topologyService();
- clusterNodeResolver = topologyService::getByConsistentId;
-
transactionStateResolver = new TransactionStateResolver(
replicaSvc,
txManager,
clock,
- clusterNodeResolver,
- topologyService::getById,
+ topologyService,
clusterService.messagingService()
);
@@ -480,7 +474,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
metaStorageMgr,
clusterService.messagingService(),
topologyService,
- clusterNodeResolver,
tableId -> latestTablesById().get(tableId)
);
@@ -1123,7 +1116,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
tableName,
tableId,
new Int2ObjectOpenHashMap<>(partitions),
- partitions, clusterNodeResolver, txManager, tableStorage,
+ partitions, clusterService.topologyService(), txManager, tableStorage,
txStateStorage, replicaSvc, clock, observableTimestampTracker, placementDriver);
var table = new TableImpl(internalTable, lockMgr, schemaVersions);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index f520cc3f0a..73c92af2a5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -30,7 +30,6 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -46,6 +45,7 @@ import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
import org.apache.ignite.internal.tx.message.TxStateResponse;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
@@ -66,13 +66,10 @@ public class TransactionStateResolver {
/** Replication service. */
private final ReplicaService replicaService;
- /** Function that resolves a node consistent ID to a cluster node. */
- private final Function<String, ClusterNode> clusterNodeResolver;
-
// TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this ticket this resolver will be no longer needed, as
// TODO we will store coordinator as ClusterNode in local tx state map.
- /** Function that resolves a node non-consistent ID to a cluster node. */
- private final Function<String, ClusterNode> clusterNodeResolverById;
+ /** Function that resolves a node consistent ID to a cluster node. */
+ private final ClusterNodeResolver clusterNodeResolver;
private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures = new ConcurrentHashMap<>();
@@ -89,22 +86,19 @@ public class TransactionStateResolver {
* @param txManager Transaction manager.
* @param clock Node clock.
* @param clusterNodeResolver Cluster node resolver.
- * @param clusterNodeResolverById Cluster node resolver using non-consistent id.
* @param messagingService Messaging service.
*/
public TransactionStateResolver(
ReplicaService replicaService,
TxManager txManager,
HybridClock clock,
- Function<String, ClusterNode> clusterNodeResolver,
- Function<String, ClusterNode> clusterNodeResolverById,
+ ClusterNodeResolver clusterNodeResolver,
MessagingService messagingService
) {
this.replicaService = replicaService;
this.txManager = txManager;
this.clock = clock;
this.clusterNodeResolver = clusterNodeResolver;
- this.clusterNodeResolverById = clusterNodeResolverById;
this.messagingService = messagingService;
}
@@ -214,7 +208,7 @@ public class TransactionStateResolver {
) {
updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
- ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId);
+ ClusterNode coordinator = clusterNodeResolver.getById(coordinatorId);
if (coordinator == null) {
// This means the coordinator node have either left the cluster or restarted.
@@ -289,7 +283,7 @@ public class TransactionStateResolver {
TxStateCommitPartitionRequest request
) {
ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
- .map(clusterNodeResolver)
+ .map(clusterNodeResolver::getByConsistentId)
.filter(Objects::nonNull)
.findFirst()
.orElseThrow(() -> new IgniteInternalException("All replica nodes are unavailable"));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ed36b4836b..cfc03ef601 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -116,6 +116,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -148,7 +149,7 @@ public class InternalTableImpl implements InternalTable {
private final int tableId;
/** Resolver that resolves a node consistent ID to cluster node. */
- private final Function<String, ClusterNode> clusterNodeResolver;
+ private final ClusterNodeResolver clusterNodeResolver;
/** Transactional manager. */
protected final TxManager txManager;
@@ -202,7 +203,7 @@ public class InternalTableImpl implements InternalTable {
int tableId,
Int2ObjectMap<RaftGroupService> partMap,
int partitions,
- Function<String, ClusterNode> clusterNodeResolver,
+ ClusterNodeResolver clusterNodeResolver,
TxManager txManager,
MvTableStorage tableStorage,
TxStateTableStorage txStateStorage,
@@ -612,7 +613,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> {
try {
- ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+ ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
if (node == null) {
throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -663,7 +664,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> {
try {
- ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+ ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
if (node == null) {
throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -1496,7 +1497,7 @@ public class InternalTableImpl implements InternalTable {
throw new IgniteInternalException("No such partition " + partition + " in table " + tableName);
}
- return clusterNodeResolver.apply(raftGroupService.leader().consistentId());
+ return clusterNodeResolver.getByConsistentId(raftGroupService.leader().consistentId());
}
/** {@inheritDoc} */
@@ -1699,7 +1700,7 @@ public class InternalTableImpl implements InternalTable {
+ " [tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']', e);
}
- ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+ ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
if (node == null) {
throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -1904,7 +1905,7 @@ public class InternalTableImpl implements InternalTable {
if (res == null) {
throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, e);
} else {
- return clusterNodeResolver.apply(res.getLeaseholder());
+ return clusterNodeResolver.getByConsistentId(res.getLeaseholder());
}
}
});
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 45388b4192..c86295cca2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -174,6 +174,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
@@ -451,12 +452,23 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
return CompletableFuture.failedFuture(new Exception("Test exception"));
}).when(messagingService).invoke(any(ClusterNode.class), any(), anyLong());
+ ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() {
+ @Override
+ public ClusterNode getById(String id) {
+ return id.equals(localNode.id()) ? localNode : anotherNode;
+ }
+
+ @Override
+ public ClusterNode getByConsistentId(String consistentId) {
+ return consistentId.equals(localNode.name()) ? localNode : anotherNode;
+ }
+ };
+
transactionStateResolver = new TransactionStateResolver(
mock(ReplicaService.class),
txManager,
clock,
- consistentId -> consistentId.equals(localNode.name()) ? localNode : anotherNode,
- id -> id.equals(localNode.id()) ? localNode : anotherNode,
+ clusterNodeResolver,
messagingService
);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index f06e86eede..9f128d6b59 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.SingleClusterNodeResolver;
import org.junit.jupiter.api.Test;
/**
@@ -62,7 +63,7 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest {
1,
Int2ObjectMaps.emptyMap(),
1,
- s -> mock(ClusterNode.class),
+ new SingleClusterNodeResolver(mock(ClusterNode.class)),
mock(TxManager.class),
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
@@ -108,7 +109,7 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest {
1,
Int2ObjectMaps.emptyMap(),
3,
- s -> mock(ClusterNode.class),
+ new SingleClusterNodeResolver(mock(ClusterNode.class)),
mock(TxManager.class),
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e54eedd99e..bd3ea99b3c 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -54,7 +54,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -132,6 +131,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
@@ -140,6 +140,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
/**
@@ -206,32 +207,37 @@ public class ItTxTestCluster {
protected String localNodeName;
- private final Function<String, ClusterNode> consistentIdToNode = consistentId -> {
- for (ClusterService service : cluster) {
- ClusterNode clusterNode = service.topologyService().localMember();
+ private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver() {
- if (clusterNode.name().equals(consistentId)) {
- return clusterNode;
+ @Override
+ public @Nullable ClusterNode getById(String id) {
+ for (ClusterService service : cluster) {
+ ClusterNode clusterNode = service.topologyService().localMember();
+
+ if (clusterNode.id().equals(id)) {
+ return clusterNode;
+ }
}
- }
- return null;
- };
+ if (client != null && client.topologyService().localMember().id().equals(id)) {
+ return client.topologyService().localMember();
+ }
- private final Function<String, ClusterNode> idToNode = id -> {
- for (ClusterService service : cluster) {
- ClusterNode clusterNode = service.topologyService().localMember();
+ return null;
+ }
+
+ @Override
+ public @Nullable ClusterNode getByConsistentId(String consistentId) {
+ for (ClusterService service : cluster) {
+ ClusterNode clusterNode = service.topologyService().localMember();
- if (clusterNode.id().equals(id)) {
- return clusterNode;
+ if (clusterNode.name().equals(consistentId)) {
+ return clusterNode;
+ }
}
- }
- if (client != null && client.topologyService().localMember().id().equals(id)) {
- return client.topologyService().localMember();
+ return null;
}
-
- return null;
};
private final TestInfo testInfo;
@@ -462,8 +468,7 @@ public class ItTxTestCluster {
replicaServices.get(assignment),
txManagers.get(assignment),
clocks.get(assignment),
- consistentIdToNode,
- idToNode,
+ nodeResolver,
clusterServices.get(assignment).messagingService()
);
transactionStateResolver.start();
@@ -552,7 +557,7 @@ public class ItTxTestCluster {
transactionStateResolver,
storageUpdateHandler,
new DummyValidationSchemasSource(schemaManager),
- consistentIdToNode.apply(assignment),
+ nodeResolver.getByConsistentId(assignment),
new AlwaysSyncedSchemaSyncService(),
catalogService,
placementDriver
@@ -617,7 +622,7 @@ public class ItTxTestCluster {
tableId,
clients,
1,
- consistentIdToNode,
+ nodeResolver,
clientTxManager,
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
@@ -839,8 +844,7 @@ public class ItTxTestCluster {
clientReplicaSvc,
clientTxManager,
clientClock,
- consistentIdToNode,
- idToNode,
+ nodeResolver,
client.messagingService()
);
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index e6e87bcccb..fe6fa1ceae 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -102,6 +102,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.SingleClusterNodeResolver;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -232,7 +233,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
nextTableId.getAndIncrement(),
Int2ObjectMaps.singleton(PART_ID, mock(RaftGroupService.class)),
1,
- name -> LOCAL_NODE,
+ new SingleClusterNodeResolver(LOCAL_NODE),
txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),