You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/10/02 09:31:15 UTC

[ignite-3] branch main updated: IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 823142d4b7 IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)
823142d4b7 is described below

commit 823142d4b76bf6a524c672b7b05840ac0e393663
Author: Cyrill <cy...@gmail.com>
AuthorDate: Mon Oct 2 12:31:08 2023 +0300

    IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  |  17 +++-
 .../ignite/internal/replicator/ReplicaManager.java |   5 +-
 .../replicator/PartitionReplicaListener.java       | 113 ++++++++++++++-------
 .../replicator/SchemaCompatValidator.java          |   3 +-
 .../RepeatedFinishReadWriteTransactionTest.java    |   6 +-
 .../internal/table/TxLocalCleanupRecoveryTest.java |  70 +++++++++++++
 .../apache/ignite/internal/table/TxLocalTest.java  |  34 +++++++
 .../replication/PartitionReplicaListenerTest.java  |   2 -
 .../ignite/internal/table/TxAbstractTest.java      |   2 +-
 .../table/impl/DummyInternalTableImpl.java         |   9 ++
 .../org/apache/ignite/internal/tx/TxManager.java   |   8 +-
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  20 ++--
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  56 +++++-----
 .../tx/message/TxCleanupReplicaRequest.java        |  11 +-
 14 files changed, 246 insertions(+), 110 deletions(-)

diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index b128afb93d..f8b5487c84 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.client.fakes;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -111,7 +113,7 @@ public class FakeTxManager implements TxManager {
 
             @Override
             public CompletableFuture<Void> commitAsync() {
-                return CompletableFuture.completedFuture(null);
+                return completedFuture(null);
             }
 
             @Override
@@ -121,7 +123,7 @@ public class FakeTxManager implements TxManager {
 
             @Override
             public CompletableFuture<Void> rollbackAsync() {
-                return CompletableFuture.completedFuture(null);
+                return completedFuture(null);
             }
 
             @Override
@@ -179,9 +181,14 @@ public class FakeTxManager implements TxManager {
     }
 
     @Override
-    public CompletableFuture<Void> cleanup(ClusterNode recipientNode, List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
-            UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
-        return null;
+    public CompletableFuture<Void> cleanup(
+            String primaryConsistentId,
+            TablePartitionId tablePartitionId,
+            UUID txId,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        return completedFuture(null);
     }
 
     @Override
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 48a179ad68..c5599f9639 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -212,13 +212,10 @@ public class ReplicaManager implements IgniteComponent {
                                     }
                                 }
                         );
-
-                        return replicaFut;
                     } else {
                         sendAwaitReplicaResponse(senderConsistentId, correlationId);
-
-                        return replicaFut;
                     }
+                    return replicaFut;
                 });
 
                 return;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5421a01ed8..397e416a39 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -32,10 +32,12 @@ import static org.apache.ignite.internal.tx.TxState.COMMITED;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 
 import java.nio.ByteBuffer;
@@ -56,11 +58,12 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -168,6 +171,10 @@ public class PartitionReplicaListener implements ReplicaListener {
     /** Logger. */
     private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class);
 
+    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
+    private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5;
+
     /** Factory to create RAFT command messages. */
     private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
 
@@ -1173,14 +1180,15 @@ public class PartitionReplicaListener implements ReplicaListener {
         UUID txId = request.txId();
 
         if (request.commit()) {
-            return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, request.commitTimestamp())
-                    .thenCompose(validationResult -> {
-                        return finishAndCleanup(request, validationResult.isSuccessful(), aggregatedGroupIds, txId, txCoordinatorId)
-                                .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult));
-                    });
+            HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+            return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, commitTimestamp)
+                    .thenCompose(validationResult ->
+                            finishAndCleanup(aggregatedGroupIds, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
+                                    .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)));
         } else {
             // Aborting.
-            return finishAndCleanup(request, false, aggregatedGroupIds, txId, txCoordinatorId);
+            return finishAndCleanup(aggregatedGroupIds, false, null, txId, txCoordinatorId);
         }
     }
 
@@ -1193,33 +1201,74 @@ public class PartitionReplicaListener implements ReplicaListener {
     }
 
     private CompletableFuture<Void> finishAndCleanup(
-            TxFinishReplicaRequest request,
+            Collection<TablePartitionId> enlistedPartitions,
             boolean commit,
-            List<TablePartitionId> aggregatedGroupIds,
+            @Nullable HybridTimestamp commitTimestamp,
             UUID txId,
             String txCoordinatorId
     ) {
-        HybridTimestamp commitTimestamp = request.commitTimestamp();
-
-        CompletableFuture<?> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp, txCoordinatorId);
-
-        CompletableFuture<?>[] cleanupFutures = new CompletableFuture[request.groups().size()];
-        AtomicInteger cleanupFuturesCnt = new AtomicInteger(0);
-
-        request.groups().forEach(
-                (recipientNode, tablePartitionIds) ->
-                        cleanupFutures[cleanupFuturesCnt.getAndIncrement()] = changeStateFuture.thenCompose(ignored ->
-                                txManager.cleanup(
-                                        recipientNode,
-                                        tablePartitionIds,
-                                        txId,
-                                        commit,
-                                        request.commitTimestamp()
-                                )
-                        )
-        );
+        CompletableFuture<?> changeStateFuture = finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId);
+
+        CompletableFuture<?>[] futures = enlistedPartitions.stream()
+                .map(partitionId -> changeStateFuture.thenCompose(ignored ->
+                        cleanupWithRetry(commit, commitTimestamp, txId, partitionId, ATTEMPTS_TO_CLEANUP_REPLICA)))
+                .toArray(size -> new CompletableFuture<?>[size]);
+
+        return allOf(futures);
+    }
+
+    private CompletableFuture<Void> cleanupWithRetry(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            TablePartitionId partitionId,
+            int attempts) {
+        HybridTimestamp now = hybridClock.now();
+
+        return findPrimaryReplica(partitionId, now)
+                .thenCompose(leaseHolder ->
+                        cleanupWithRetryOnReplica(commit, commitTimestamp, txId, partitionId, leaseHolder, attempts));
+    }
 
-        return allOf(cleanupFutures);
+
+    private CompletableFuture<Void> cleanupWithRetryOnReplica(
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp,
+            UUID txId,
+            TablePartitionId partitionId,
+            String primaryConsistentId,
+            int attempts) {
+        return txManager.cleanup(primaryConsistentId, partitionId, txId, commit, commitTimestamp)
+                .handle((res, ex) -> {
+                    if (ex != null) {
+                        LOG.warn("Failed to perform cleanup on Tx {}." + (attempts > 0 ? " The operation will be retried." : ""), txId, ex);
+
+                        if (attempts > 0) {
+                            return cleanupWithRetry(commit, commitTimestamp, txId, partitionId, attempts - 1);
+                        }
+
+                        return CompletableFuture.<Void>failedFuture(ex);
+                    }
+
+                    return CompletableFuture.<Void>completedFuture(null);
+                })
+                .thenCompose(Function.identity());
+    }
+
+    private CompletableFuture<String> findPrimaryReplica(TablePartitionId partitionId, HybridTimestamp now) {
+        return placementDriver.awaitPrimaryReplica(partitionId, now)
+                .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS)
+                .handle((primaryReplica, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to retrieve primary replica for partition {}", partitionId, e);
+
+                        throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR,
+                                "Failed to get the primary replica"
+                                        + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + now + ']', e);
+                    }
+
+                    return primaryReplica.getLeaseholder();
+                });
     }
 
     /**
@@ -1233,7 +1282,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return Future to wait of the finish.
      */
     private CompletableFuture<Object> finishTransaction(
-            List<TablePartitionId> aggregatedGroupIds,
+            Collection<TablePartitionId> aggregatedGroupIds,
             UUID txId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp,
@@ -2416,10 +2465,6 @@ public class PartitionReplicaListener implements ReplicaListener {
         } else if (request instanceof TxFinishReplicaRequest) {
             expectedTerm = ((TxFinishReplicaRequest) request).term();
 
-            assert expectedTerm != null;
-        } else if (request instanceof TxCleanupReplicaRequest) {
-            expectedTerm = ((TxCleanupReplicaRequest) request).term();
-
             assert expectedTerm != null;
         } else {
             expectedTerm = null;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 327457633a..9b37043388 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.replicator;
 
 import static java.util.stream.Collectors.toSet;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -58,7 +59,7 @@ class SchemaCompatValidator {
      */
     CompletableFuture<CompatValidationResult> validateForward(
             UUID txId,
-            List<TablePartitionId> enlistedGroupIds,
+            Collection<TablePartitionId> enlistedGroupIds,
             @Nullable HybridTimestamp commitTimestamp
     ) {
         HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index d36b9d9349..ed114e14d4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -292,13 +292,13 @@ public class RepeatedFinishReadWriteTransactionTest extends BaseIgniteAbstractTe
 
         @Override
         public CompletableFuture<Void> cleanup(
-                ClusterNode recipientNode,
-                List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+                String primaryConsistentId,
+                TablePartitionId tablePartitionId,
                 UUID txId,
                 boolean commit,
                 @Nullable HybridTimestamp commitTimestamp
         ) {
-            return null;
+            return completedFuture(null);
         }
 
         @Override
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java
new file mode 100644
index 0000000000..6e8e36aae2
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+/**
+ * Durable cleanup test with successfull recovery after the fauilures.
+ */
+public class TxLocalCleanupRecoveryTest extends TxLocalTest {
+
+    private AtomicInteger failureCounter;
+
+    @BeforeEach
+    void initTest() {
+        // The value of 3 is less than the allowed number of cleanup retries.
+        failureCounter = new AtomicInteger(3);
+    }
+
+
+    @Override
+    protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) {
+        ReplicaRequest request = invocationOnMock.getArgument(1);
+
+        if (request instanceof TxCleanupReplicaRequest && failureCounter.getAndDecrement() > 0) {
+            return new MethodAnswer(CompletableFuture.failedFuture(new ReplicationException(
+                    REPLICA_COMMON_ERR,
+                    "Test Tx Cleanup exception [replicaGroupId=" + request.groupId() + ']')));
+        }
+        // Otherwise use the parent stub.
+        return super.invokeOnMessagingMock(invocationOnMock);
+    }
+
+
+    @Test
+    @Override
+    public void testDeleteUpsertCommit() throws TransactionException {
+        // The value of 6 is higher than the default retry count.
+        // So we should give up retrying and crash.
+        failureCounter = new AtomicInteger(6);
+
+        assertThrows(TransactionException.class, () -> deleteUpsert().commit());
+    }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 149f965642..bc69c327d2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -50,9 +50,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Table;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.mockito.invocation.InvocationOnMock;
 
 /**
  * Local table tests.
@@ -81,6 +83,12 @@ public class TxLocalTest extends TxAbstractTest {
 
         Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>();
         doAnswer(invocationOnMock -> {
+            MethodAnswer answer = invokeOnMessagingMock(invocationOnMock);
+
+            if (answer.hasAnswer) {
+                return answer.answer;
+            }
+
             ReplicaRequest request = invocationOnMock.getArgument(1);
             ReplicaListener replicaListener = tables.get(request.groupId()).getReplicaListener();
 
@@ -151,6 +159,10 @@ public class TxLocalTest extends TxAbstractTest {
         tables.put(table2.groupId(), table2);
     }
 
+    protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) {
+        return MethodAnswer.NO_ANSWER;
+    }
+
     @AfterEach
     public void tearDown() throws Exception {
         if (txManager != null) {
@@ -188,4 +200,26 @@ public class TxLocalTest extends TxAbstractTest {
     protected Collection<TxManager> txManagers() {
         return List.of(txManager);
     }
+
+    /**
+     * A class that represents the result of method execution.
+     */
+    protected static class MethodAnswer {
+        static final MethodAnswer NO_ANSWER = new MethodAnswer(null, false);
+
+        /** The result of method invocation. */
+        final @Nullable Object answer;
+
+        /** If {@code true} the method was called. */
+        final boolean hasAnswer;
+
+        public MethodAnswer(@Nullable Object answer, boolean hasAnswer) {
+            this.answer = answer;
+            this.hasAnswer = hasAnswer;
+        }
+
+        public MethodAnswer(@Nullable Object answer) {
+            this(answer, true);
+        }
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index a921284989..abc281d7a4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1276,7 +1276,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .txId(txId)
                         .commit(true)
                         .commitTimestampLong(now.longValue())
-                        .term(1L)
                         .build(),
                     localNode.id()
             );
@@ -2025,7 +2024,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                     .txId(txId)
                     .commit(true)
                     .commitTimestampLong(commitTs.longValue())
-                    .term(1L)
                     .build(),
                 localNode.id()
         ).join();
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 69cc7409ef..d5f9fa98d1 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -161,7 +161,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
         assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
     }
 
-    private InternalTransaction deleteUpsert() {
+    protected InternalTransaction deleteUpsert() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
 
         InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index f7efbf495e..33facb20be 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -257,6 +258,14 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                         return replicaListener.invoke(invocationOnMock.getArgument(1), node.id());
                     })
                     .when(replicaSvc).invoke(any(ClusterNode.class), any());
+
+            lenient()
+                    .doAnswer(invocationOnMock -> {
+                        String nodeId = invocationOnMock.getArgument(0);
+
+                        return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId);
+                    })
+                    .when(replicaSvc).invoke(anyString(), any());
         }
 
         AtomicLong raftIndex = new AtomicLong();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index c0fbd57a02..280cc91e74 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -128,16 +128,16 @@ public interface TxManager extends IgniteComponent {
     /**
      * Sends cleanup request to the specified primary replica.
      *
-     * @param recipientNode Primary replica to process given cleanup request.
-     * @param tablePartitionIds Table partition ids with raft terms.
+     * @param primaryConsistentId  A consistent id of the primary replica node.
+     * @param tablePartitionId Table partition id.
      * @param txId Transaction id.
      * @param commit {@code True} if a commit requested.
      * @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
      * @return Completable future of Void.
      */
     CompletableFuture<Void> cleanup(
-            ClusterNode recipientNode,
-            List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+            String primaryConsistentId,
+            TablePartitionId tablePartitionId,
             UUID txId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e191f8ca2d..f26090ab2f 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -120,22 +120,14 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
                             Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups = new LinkedHashMap<>();
 
                             if (!enlisted.isEmpty()) {
-                                enlisted.forEach((groupId, groupMeta) -> {
-                                    ClusterNode recipientNode = groupMeta.get1();
+                                enlisted.forEach((groupId, groupMeta) ->
+                                        groups.computeIfAbsent(groupMeta.get1(), clusterNode -> new ArrayList<>())
+                                                .add(new IgniteBiTuple<>(groupId, groupMeta.get2())));
 
-                                    if (groups.containsKey(recipientNode)) {
-                                        groups.get(recipientNode).add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
-                                    } else {
-                                        List<IgniteBiTuple<TablePartitionId, Long>> items = new ArrayList<>();
+                                IgniteBiTuple<ClusterNode, Long> nodeAndTerm = enlisted.get(commitPart);
 
-                                        items.add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
-
-                                        groups.put(recipientNode, items);
-                                    }
-                                });
-
-                                ClusterNode recipientNode = enlisted.get(commitPart).get1();
-                                Long term = enlisted.get(commitPart).get2();
+                                ClusterNode recipientNode = nodeAndTerm.get1();
+                                Long term = nodeAndTerm.get2();
 
                                 LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, groups={}",
                                         recipientNode, term, commit, id(), groups);
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ca96d6cb3a..f8adbe49a4 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -294,51 +294,43 @@ public class TxManagerImpl implements TxManager {
                 .build();
 
         return replicaService.invoke(recipientNode, req)
-                .thenRun(() -> {
-                    updateTxMeta(txId, old -> {
-                        if (isFinalState(old.txState())) {
-                            finishingStateMeta.txFinishFuture().complete(old);
+                .thenRun(() ->
+                        updateTxMeta(txId, old -> {
+                            if (isFinalState(old.txState())) {
+                                finishingStateMeta.txFinishFuture().complete(old);
 
-                            return old;
-                        }
+                                return old;
+                            }
 
-                        assert old instanceof TxStateMetaFinishing;
+                            assert old instanceof TxStateMetaFinishing;
 
-                        TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp);
+                            TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp);
 
-                        finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
+                            finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
 
-                        return finalTxStateMeta;
-                    });
-                });
+                            return finalTxStateMeta;
+                        })
+                );
     }
 
     @Override
     public CompletableFuture<Void> cleanup(
-            ClusterNode recipientNode,
-            List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+            String primaryConsistentId,
+            TablePartitionId tablePartitionId,
             UUID txId,
             boolean commit,
             @Nullable HybridTimestamp commitTimestamp
     ) {
-        var cleanupFutures = new CompletableFuture[tablePartitionIds.size()];
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-17582 Grouping replica requests.
-        for (int i = 0; i < tablePartitionIds.size(); i++) {
-            cleanupFutures[i] = replicaService.invoke(
-                    recipientNode,
-                    FACTORY.txCleanupReplicaRequest()
-                            .groupId(tablePartitionIds.get(i).get1())
-                            .timestampLong(clock.nowLong())
-                            .txId(txId)
-                            .commit(commit)
-                            .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
-                            .term(tablePartitionIds.get(i).get2())
-                            .build()
-            );
-        }
-
-        return allOf(cleanupFutures);
+        return replicaService.invoke(
+                primaryConsistentId,
+                FACTORY.txCleanupReplicaRequest()
+                        .groupId(tablePartitionId)
+                        .timestampLong(clock.nowLong())
+                        .txId(txId)
+                        .commit(commit)
+                        .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+                        .build()
+        );
     }
 
     @Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
index d5503e71cf..2032cf1352 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
@@ -34,7 +34,7 @@ import org.jetbrains.annotations.Nullable;
  *      <li>Release all locks that were held on local replica by given transaction.</li>
  *  </ol>
  */
-@Transferable(value = TxMessageGroup.TX_CLEANUP_REQUEST)
+@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST)
 public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware {
     /**
      * Returns transaction Id.
@@ -65,13 +65,4 @@ public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware
     default @Nullable HybridTimestamp commitTimestamp() {
         return nullableHybridTimestamp(commitTimestampLong());
     }
-
-    /**
-     * Gets a raft term.
-     * TODO: A temp solution until lease-based engine will be implemented (IGNITE-17256, IGNITE-15083)
-     *
-     * @return Raft term.
-     */
-    @Deprecated
-    Long term();
 }