You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2023/12/12 12:15:39 UTC

(ignite-3) branch main updated: IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 492f01b642 IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)
492f01b642 is described below

commit 492f01b6423c3514cbe4b3ac45b3d84e70080318
Author: Cyrill <cy...@gmail.com>
AuthorDate: Tue Dec 12 15:15:33 2023 +0300

    IGNITE-21060 Extract ClusterNodeResolver as a separate entity (#2946)
---
 ...pologyService.java => ClusterNodeResolver.java} | 37 ++++-----------
 .../org/apache/ignite/network/TopologyService.java | 18 +-------
 .../ignite/network/SingleClusterNodeResolver.java  | 45 ++++++++++++++++++
 modules/sql-engine/build.gradle                    |  1 +
 .../exec/rel/TableScanNodeExecutionTest.java       |  3 +-
 modules/table/build.gradle                         |  3 ++
 .../ignite/internal/table/ItColocationTest.java    |  3 +-
 .../PartitionReplicatorNodeRecovery.java           | 12 ++---
 .../internal/table/distributed/TableManager.java   | 11 +----
 .../replicator/TransactionStateResolver.java       | 18 +++-----
 .../distributed/storage/InternalTableImpl.java     | 15 +++---
 .../replication/PartitionReplicaListenerTest.java  | 16 ++++++-
 .../distributed/storage/InternalTableImplTest.java |  5 +-
 .../apache/ignite/distributed/ItTxTestCluster.java | 54 ++++++++++++----------
 .../table/impl/DummyInternalTableImpl.java         |  3 +-
 15 files changed, 130 insertions(+), 114 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
similarity index 57%
copy from modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
copy to modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
index 13b647e92d..773b369b1d 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNodeResolver.java
@@ -17,42 +17,23 @@
 
 package org.apache.ignite.network;
 
-import java.util.Collection;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Entry point for obtaining physical cluster topology information.
+ * A node resolver.
  */
-// TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService {
+public interface ClusterNodeResolver {
     /**
-     * Returns information about the current node.
+     * Returns a cluster node consistent ID by its node ID.
      *
-     * @return Information about the local network member.
-     */
-    ClusterNode localMember();
-
-    /**
-     * Returns a list of all discovered cluster members, including the local member itself.
-     *
-     * @return List of the discovered cluster members.
-     */
-    Collection<ClusterNode> allMembers();
-
-    /**
-     * Registers a handler for physical topology change events.
-     *
-     * @param handler Physical topology event handler.
+     * @param id Node ID.
+     * @return The consistent ID; {@code null} if the node has not been discovered or is offline.
      */
-    void addEventHandler(TopologyEventHandler handler);
+    default @Nullable String getConsistentIdById(String id) {
+        ClusterNode node = getById(id);
 
-    /**
-     * Returns a cluster node specified by its network address in the 'host:port' format.
-     *
-     * @param addr The network address.
-     * @return The node object; {@code null} if the node has not been discovered or is offline.
-     */
-    @Nullable ClusterNode getByAddress(NetworkAddress addr);
+        return node != null ? node.name() : null;
+    }
 
     /**
      * Returns a cluster node specified by its consistent ID.
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index 13b647e92d..14cf9147fe 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.Nullable;
  * Entry point for obtaining physical cluster topology information.
  */
 // TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService {
+public interface TopologyService extends ClusterNodeResolver {
     /**
      * Returns information about the current node.
      *
@@ -53,20 +53,4 @@ public interface TopologyService {
      * @return The node object; {@code null} if the node has not been discovered or is offline.
      */
     @Nullable ClusterNode getByAddress(NetworkAddress addr);
-
-    /**
-     * Returns a cluster node specified by its consistent ID.
-     *
-     * @param consistentId Consistent ID.
-     * @return The node object; {@code null} if the node has not been discovered or is offline.
-     */
-    @Nullable ClusterNode getByConsistentId(String consistentId);
-
-    /**
-     * Returns a cluster node specified by its ID.
-     *
-     * @param id Node ID.
-     * @return The node object; {@code null} if the node has not been discovered or is offline.
-     */
-    @Nullable ClusterNode getById(String id);
 }
diff --git a/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java b/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java
new file mode 100644
index 0000000000..57b744f9aa
--- /dev/null
+++ b/modules/api/src/testFixtures/java/org/apache/ignite/network/SingleClusterNodeResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+/**
+ * A class that returns a single {@link ClusterNode} for every request.
+ */
+public class SingleClusterNodeResolver implements ClusterNodeResolver {
+
+    private final ClusterNode clusterNode;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterNode Default cluster node that will be returned as a result of all method calls.
+     */
+    public SingleClusterNodeResolver(ClusterNode clusterNode) {
+        this.clusterNode = clusterNode;
+    }
+
+    @Override
+    public ClusterNode getByConsistentId(String consistentId) {
+        return clusterNode;
+    }
+
+    @Override
+    public ClusterNode getById(String id) {
+        return clusterNode;
+    }
+}
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index 57d906a1a5..d80df9f27f 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -84,6 +84,7 @@ dependencies {
     testImplementation project(':ignite-vault')
     testImplementation project(':ignite-placement-driver')
     testImplementation libs.jmh.core
+    testImplementation(testFixtures(project(':ignite-api')))
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-configuration')))
     testImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 29497d3a3a..66c3761a8c 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -74,6 +74,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.SingleClusterNodeResolver;
 import org.apache.ignite.network.TopologyService;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -207,7 +208,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]>
                     1,
                     Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
                     PART_CNT,
-                    addr -> mock(ClusterNode.class),
+                    new SingleClusterNodeResolver(mock(ClusterNode.class)),
                     txManager,
                     mock(MvTableStorage.class),
                     mock(TxStateTableStorage.class),
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 891d7e396e..55382658cd 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -60,6 +60,7 @@ dependencies {
     testImplementation project(':ignite-storage-rocksdb')
     testImplementation project(':ignite-placement-driver-api')
     testImplementation project(':ignite-system-view-api')
+    testImplementation(testFixtures(project(':ignite-api')))
     testImplementation(testFixtures(project(':ignite-core')))
     testImplementation(testFixtures(project(':ignite-schema')))
     testImplementation(testFixtures(project(':ignite-configuration')))
@@ -94,6 +95,7 @@ dependencies {
     testFixturesImplementation project(':ignite-affinity')
     testFixturesImplementation project(':ignite-configuration-api')
     testFixturesImplementation(testFixtures(project(':ignite-configuration')))
+    testFixturesImplementation(testFixtures(project(':ignite-api')))
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation(testFixtures(project(':ignite-storage-api')))
     testFixturesImplementation(testFixtures(project(':ignite-transactions')))
@@ -122,6 +124,7 @@ dependencies {
     integrationTestImplementation project(':ignite-page-memory')
     integrationTestImplementation project(':ignite-storage-page-memory')
     integrationTestImplementation(testFixtures(project))
+    integrationTestImplementation(testFixtures(project(':ignite-api')))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation(testFixtures(project(':ignite-table')))
     integrationTestImplementation(testFixtures(project(':ignite-schema')))
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 90b1839ea1..fec00c73a3 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -103,6 +103,7 @@ import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.SingleClusterNodeResolver;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -266,7 +267,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
                 tblId,
                 partRafts,
                 PARTS,
-                name -> clusterNode,
+                new SingleClusterNodeResolver(clusterNode),
                 txManager,
                 mock(MvTableStorage.class),
                 new TestTxStateTableStorage(),
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
index a04956ff0a..06feb00559 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PartitionReplicatorNodeRecovery.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Function;
 import java.util.function.IntFunction;
 import org.apache.ignite.internal.affinity.Assignment;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -71,9 +70,6 @@ class PartitionReplicatorNodeRecovery {
 
     private final TopologyService topologyService;
 
-    /** Resolver that resolves a node consistent ID to cluster node. */
-    private final Function<String, ClusterNode> clusterNodeResolver;
-
     /** Obtains a TableImpl instance by a table ID. */
     private final IntFunction<TableViewInternal> tableById;
 
@@ -81,13 +77,11 @@ class PartitionReplicatorNodeRecovery {
             MetaStorageManager metaStorageManager,
             MessagingService messagingService,
             TopologyService topologyService,
-            Function<String, ClusterNode> clusterNodeResolver,
             IntFunction<TableViewInternal> tableById
     ) {
         this.metaStorageManager = metaStorageManager;
         this.messagingService = messagingService;
         this.topologyService = topologyService;
-        this.clusterNodeResolver = clusterNodeResolver;
         this.tableById = tableById;
     }
 
@@ -216,7 +210,7 @@ class PartitionReplicatorNodeRecovery {
         Map<String, ClusterNode> peerNodesByConsistentIds = new ConcurrentHashMap<>();
 
         for (Peer peer : peers) {
-            ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+            ClusterNode node = topologyService.getByConsistentId(peer.consistentId());
 
             if (node != null) {
                 peerNodesByConsistentIds.put(peer.consistentId(), node);
@@ -247,7 +241,7 @@ class PartitionReplicatorNodeRecovery {
         // Check again for peers that could appear in the topology since last check, but before we installed the handler.
         for (Peer peer : peers) {
             if (!peerNodesByConsistentIds.containsKey(peer.consistentId())) {
-                ClusterNode node = clusterNodeResolver.apply(peer.consistentId());
+                ClusterNode node = topologyService.getByConsistentId(peer.consistentId());
 
                 if (node != null) {
                     peerNodesByConsistentIds.put(peer.consistentId(), node);
@@ -284,7 +278,7 @@ class PartitionReplicatorNodeRecovery {
         //noinspection unchecked
         CompletableFuture<Boolean>[] requestFutures = peers.stream()
                 .map(Peer::consistentId)
-                .map(clusterNodeResolver)
+                .map(topologyService::getByConsistentId)
                 .filter(Objects::nonNull)
                 .map(node -> messagingService
                         .invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT_MILLIS)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b46d5fca24..b5195f854b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -263,9 +263,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
     /** Started tables. */
     private final Map<Integer, TableImpl> startedTables = new ConcurrentHashMap<>();
 
-    /** Resolver that resolves a node consistent ID to cluster node. */
-    private final Function<String, ClusterNode> clusterNodeResolver;
-
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -414,14 +411,11 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
 
         TopologyService topologyService = clusterService.topologyService();
 
-        clusterNodeResolver = topologyService::getByConsistentId;
-
         transactionStateResolver = new TransactionStateResolver(
                 replicaSvc,
                 txManager,
                 clock,
-                clusterNodeResolver,
-                topologyService::getById,
+                topologyService,
                 clusterService.messagingService()
         );
 
@@ -480,7 +474,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 metaStorageMgr,
                 clusterService.messagingService(),
                 topologyService,
-                clusterNodeResolver,
                 tableId -> latestTablesById().get(tableId)
         );
 
@@ -1123,7 +1116,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 tableName,
                 tableId,
                 new Int2ObjectOpenHashMap<>(partitions),
-                partitions, clusterNodeResolver, txManager, tableStorage,
+                partitions, clusterService.topologyService(), txManager, tableStorage,
                 txStateStorage, replicaSvc, clock, observableTimestampTracker, placementDriver);
 
         var table = new TableImpl(internalTable, lockMgr, schemaVersions);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index f520cc3f0a..73c92af2a5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -30,7 +30,6 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -46,6 +45,7 @@ import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
 import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest;
 import org.apache.ignite.internal.tx.message.TxStateResponse;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -66,13 +66,10 @@ public class TransactionStateResolver {
     /** Replication service. */
     private final ReplicaService replicaService;
 
-    /** Function that resolves a node consistent ID to a cluster node. */
-    private final Function<String, ClusterNode> clusterNodeResolver;
-
     // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this ticket this resolver will be no longer needed, as
     // TODO we will store coordinator as ClusterNode in local tx state map.
-    /** Function that resolves a node non-consistent ID to a cluster node. */
-    private final Function<String, ClusterNode> clusterNodeResolverById;
+    /** Function that resolves a node consistent ID to a cluster node. */
+    private final ClusterNodeResolver clusterNodeResolver;
 
     private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures = new ConcurrentHashMap<>();
 
@@ -89,22 +86,19 @@ public class TransactionStateResolver {
      * @param txManager Transaction manager.
      * @param clock Node clock.
      * @param clusterNodeResolver Cluster node resolver.
-     * @param clusterNodeResolverById Cluster node resolver using non-consistent id.
      * @param messagingService Messaging service.
      */
     public TransactionStateResolver(
             ReplicaService replicaService,
             TxManager txManager,
             HybridClock clock,
-            Function<String, ClusterNode> clusterNodeResolver,
-            Function<String, ClusterNode> clusterNodeResolverById,
+            ClusterNodeResolver clusterNodeResolver,
             MessagingService messagingService
     ) {
         this.replicaService = replicaService;
         this.txManager = txManager;
         this.clock = clock;
         this.clusterNodeResolver = clusterNodeResolver;
-        this.clusterNodeResolverById = clusterNodeResolverById;
         this.messagingService = messagingService;
     }
 
@@ -214,7 +208,7 @@ public class TransactionStateResolver {
     ) {
         updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
 
-        ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId);
+        ClusterNode coordinator = clusterNodeResolver.getById(coordinatorId);
 
         if (coordinator == null) {
             // This means the coordinator node have either left the cluster or restarted.
@@ -289,7 +283,7 @@ public class TransactionStateResolver {
             TxStateCommitPartitionRequest request
     ) {
         ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).stream()
-                .map(clusterNodeResolver)
+                .map(clusterNodeResolver::getByConsistentId)
                 .filter(Objects::nonNull)
                 .findFirst()
                 .orElseThrow(() -> new IgniteInternalException("All replica nodes are unavailable"));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ed36b4836b..cfc03ef601 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -116,6 +116,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -148,7 +149,7 @@ public class InternalTableImpl implements InternalTable {
     private final int tableId;
 
     /** Resolver that resolves a node consistent ID to cluster node. */
-    private final Function<String, ClusterNode> clusterNodeResolver;
+    private final ClusterNodeResolver clusterNodeResolver;
 
     /** Transactional manager. */
     protected final TxManager txManager;
@@ -202,7 +203,7 @@ public class InternalTableImpl implements InternalTable {
             int tableId,
             Int2ObjectMap<RaftGroupService> partMap,
             int partitions,
-            Function<String, ClusterNode> clusterNodeResolver,
+            ClusterNodeResolver clusterNodeResolver,
             TxManager txManager,
             MvTableStorage tableStorage,
             TxStateTableStorage txStateStorage,
@@ -612,7 +613,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<R> fut = primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
-                ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+                ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
 
                 if (node == null) {
                     throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -663,7 +664,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<R>  fut = primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
-                ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+                ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
 
                 if (node == null) {
                     throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -1496,7 +1497,7 @@ public class InternalTableImpl implements InternalTable {
             throw new IgniteInternalException("No such partition " + partition + " in table " + tableName);
         }
 
-        return clusterNodeResolver.apply(raftGroupService.leader().consistentId());
+        return clusterNodeResolver.getByConsistentId(raftGroupService.leader().consistentId());
     }
 
     /** {@inheritDoc} */
@@ -1699,7 +1700,7 @@ public class InternalTableImpl implements InternalTable {
                         + " [tablePartitionId=" + tablePartitionId + ", awaitTimestamp=" + now + ']', e);
             }
 
-            ClusterNode node = clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+            ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
 
             if (node == null) {
                 throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to resolve the primary replica node [consistentId="
@@ -1904,7 +1905,7 @@ public class InternalTableImpl implements InternalTable {
                         if (res == null) {
                             throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, e);
                         } else {
-                            return clusterNodeResolver.apply(res.getLeaseholder());
+                            return clusterNodeResolver.getByConsistentId(res.getLeaseholder());
                         }
                     }
                 });
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 45388b4192..c86295cca2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -174,6 +174,7 @@ import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
@@ -451,12 +452,23 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             return CompletableFuture.failedFuture(new Exception("Test exception"));
         }).when(messagingService).invoke(any(ClusterNode.class), any(), anyLong());
 
+        ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() {
+            @Override
+            public ClusterNode getById(String id) {
+                return id.equals(localNode.id()) ? localNode : anotherNode;
+            }
+
+            @Override
+            public ClusterNode getByConsistentId(String consistentId) {
+                return consistentId.equals(localNode.name()) ? localNode : anotherNode;
+            }
+        };
+
         transactionStateResolver = new TransactionStateResolver(
                 mock(ReplicaService.class),
                 txManager,
                 clock,
-                consistentId -> consistentId.equals(localNode.name()) ? localNode : anotherNode,
-                id -> id.equals(localNode.id()) ? localNode : anotherNode,
+                clusterNodeResolver,
                 messagingService
         );
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index f06e86eede..9f128d6b59 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.SingleClusterNodeResolver;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -62,7 +63,7 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest {
                 1,
                 Int2ObjectMaps.emptyMap(),
                 1,
-                s -> mock(ClusterNode.class),
+                new SingleClusterNodeResolver(mock(ClusterNode.class)),
                 mock(TxManager.class),
                 mock(MvTableStorage.class),
                 mock(TxStateTableStorage.class),
@@ -108,7 +109,7 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest {
                 1,
                 Int2ObjectMaps.emptyMap(),
                 3,
-                s -> mock(ClusterNode.class),
+                new SingleClusterNodeResolver(mock(ClusterNode.class)),
                 mock(TxManager.class),
                 mock(MvTableStorage.class),
                 mock(TxStateTableStorage.class),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e54eedd99e..bd3ea99b3c 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -54,7 +54,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -132,6 +131,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
@@ -140,6 +140,7 @@ import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInfo;
 
 /**
@@ -206,32 +207,37 @@ public class ItTxTestCluster {
 
     protected String localNodeName;
 
-    private final Function<String, ClusterNode> consistentIdToNode = consistentId -> {
-        for (ClusterService service : cluster) {
-            ClusterNode clusterNode = service.topologyService().localMember();
+    private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver() {
 
-            if (clusterNode.name().equals(consistentId)) {
-                return clusterNode;
+        @Override
+        public @Nullable ClusterNode getById(String id) {
+            for (ClusterService service : cluster) {
+                ClusterNode clusterNode = service.topologyService().localMember();
+
+                if (clusterNode.id().equals(id)) {
+                    return clusterNode;
+                }
             }
-        }
 
-        return null;
-    };
+            if (client != null && client.topologyService().localMember().id().equals(id)) {
+                return client.topologyService().localMember();
+            }
 
-    private final Function<String, ClusterNode> idToNode = id -> {
-        for (ClusterService service : cluster) {
-            ClusterNode clusterNode = service.topologyService().localMember();
+            return null;
+        }
+
+        @Override
+        public @Nullable ClusterNode getByConsistentId(String consistentId) {
+            for (ClusterService service : cluster) {
+                ClusterNode clusterNode = service.topologyService().localMember();
 
-            if (clusterNode.id().equals(id)) {
-                return clusterNode;
+                if (clusterNode.name().equals(consistentId)) {
+                    return clusterNode;
+                }
             }
-        }
 
-        if (client != null && client.topologyService().localMember().id().equals(id)) {
-            return client.topologyService().localMember();
+            return null;
         }
-
-        return null;
     };
 
     private final TestInfo testInfo;
@@ -462,8 +468,7 @@ public class ItTxTestCluster {
                         replicaServices.get(assignment),
                         txManagers.get(assignment),
                         clocks.get(assignment),
-                        consistentIdToNode,
-                        idToNode,
+                        nodeResolver,
                         clusterServices.get(assignment).messagingService()
                 );
                 transactionStateResolver.start();
@@ -552,7 +557,7 @@ public class ItTxTestCluster {
                                         transactionStateResolver,
                                         storageUpdateHandler,
                                         new DummyValidationSchemasSource(schemaManager),
-                                        consistentIdToNode.apply(assignment),
+                                        nodeResolver.getByConsistentId(assignment),
                                         new AlwaysSyncedSchemaSyncService(),
                                         catalogService,
                                         placementDriver
@@ -617,7 +622,7 @@ public class ItTxTestCluster {
                         tableId,
                         clients,
                         1,
-                        consistentIdToNode,
+                        nodeResolver,
                         clientTxManager,
                         mock(MvTableStorage.class),
                         mock(TxStateTableStorage.class),
@@ -839,8 +844,7 @@ public class ItTxTestCluster {
                 clientReplicaSvc,
                 clientTxManager,
                 clientClock,
-                consistentIdToNode,
-                idToNode,
+                nodeResolver,
                 client.messagingService()
         );
 
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index e6e87bcccb..fe6fa1ceae 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -102,6 +102,7 @@ import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.SingleClusterNodeResolver;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
@@ -232,7 +233,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 nextTableId.getAndIncrement(),
                 Int2ObjectMaps.singleton(PART_ID, mock(RaftGroupService.class)),
                 1,
-                name -> LOCAL_NODE,
+                new SingleClusterNodeResolver(LOCAL_NODE),
                 txManager,
                 mock(MvTableStorage.class),
                 new TestTxStateTableStorage(),