You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/10/02 09:31:15 UTC
[ignite-3] branch main updated: IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 823142d4b7 IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)
823142d4b7 is described below
commit 823142d4b76bf6a524c672b7b05840ac0e393663
Author: Cyrill <cy...@gmail.com>
AuthorDate: Mon Oct 2 12:31:08 2023 +0300
IGNITE-20055 Durable txCleanupReplicaRequest send from the commit patition (#2624)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 17 +++-
.../ignite/internal/replicator/ReplicaManager.java | 5 +-
.../replicator/PartitionReplicaListener.java | 113 ++++++++++++++-------
.../replicator/SchemaCompatValidator.java | 3 +-
.../RepeatedFinishReadWriteTransactionTest.java | 6 +-
.../internal/table/TxLocalCleanupRecoveryTest.java | 70 +++++++++++++
.../apache/ignite/internal/table/TxLocalTest.java | 34 +++++++
.../replication/PartitionReplicaListenerTest.java | 2 -
.../ignite/internal/table/TxAbstractTest.java | 2 +-
.../table/impl/DummyInternalTableImpl.java | 9 ++
.../org/apache/ignite/internal/tx/TxManager.java | 8 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 20 ++--
.../ignite/internal/tx/impl/TxManagerImpl.java | 56 +++++-----
.../tx/message/TxCleanupReplicaRequest.java | 11 +-
14 files changed, 246 insertions(+), 110 deletions(-)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index b128afb93d..f8b5487c84 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.client.fakes;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -111,7 +113,7 @@ public class FakeTxManager implements TxManager {
@Override
public CompletableFuture<Void> commitAsync() {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
@Override
@@ -121,7 +123,7 @@ public class FakeTxManager implements TxManager {
@Override
public CompletableFuture<Void> rollbackAsync() {
- return CompletableFuture.completedFuture(null);
+ return completedFuture(null);
}
@Override
@@ -179,9 +181,14 @@ public class FakeTxManager implements TxManager {
}
@Override
- public CompletableFuture<Void> cleanup(ClusterNode recipientNode, List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
- UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
- return null;
+ public CompletableFuture<Void> cleanup(
+ String primaryConsistentId,
+ TablePartitionId tablePartitionId,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ return completedFuture(null);
}
@Override
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 48a179ad68..c5599f9639 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -212,13 +212,10 @@ public class ReplicaManager implements IgniteComponent {
}
}
);
-
- return replicaFut;
} else {
sendAwaitReplicaResponse(senderConsistentId, correlationId);
-
- return replicaFut;
}
+ return replicaFut;
});
return;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5421a01ed8..397e416a39 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -32,10 +32,12 @@ import static org.apache.ignite.internal.tx.TxState.COMMITED;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import java.nio.ByteBuffer;
@@ -56,11 +58,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -168,6 +171,10 @@ public class PartitionReplicaListener implements ReplicaListener {
/** Logger. */
private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class);
+ private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
+ private static final int ATTEMPTS_TO_CLEANUP_REPLICA = 5;
+
/** Factory to create RAFT command messages. */
private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
@@ -1173,14 +1180,15 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID txId = request.txId();
if (request.commit()) {
- return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, request.commitTimestamp())
- .thenCompose(validationResult -> {
- return finishAndCleanup(request, validationResult.isSuccessful(), aggregatedGroupIds, txId, txCoordinatorId)
- .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult));
- });
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateForward(txId, aggregatedGroupIds, commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(aggregatedGroupIds, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
+ .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)));
} else {
// Aborting.
- return finishAndCleanup(request, false, aggregatedGroupIds, txId, txCoordinatorId);
+ return finishAndCleanup(aggregatedGroupIds, false, null, txId, txCoordinatorId);
}
}
@@ -1193,33 +1201,74 @@ public class PartitionReplicaListener implements ReplicaListener {
}
private CompletableFuture<Void> finishAndCleanup(
- TxFinishReplicaRequest request,
+ Collection<TablePartitionId> enlistedPartitions,
boolean commit,
- List<TablePartitionId> aggregatedGroupIds,
+ @Nullable HybridTimestamp commitTimestamp,
UUID txId,
String txCoordinatorId
) {
- HybridTimestamp commitTimestamp = request.commitTimestamp();
-
- CompletableFuture<?> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit, commitTimestamp, txCoordinatorId);
-
- CompletableFuture<?>[] cleanupFutures = new CompletableFuture[request.groups().size()];
- AtomicInteger cleanupFuturesCnt = new AtomicInteger(0);
-
- request.groups().forEach(
- (recipientNode, tablePartitionIds) ->
- cleanupFutures[cleanupFuturesCnt.getAndIncrement()] = changeStateFuture.thenCompose(ignored ->
- txManager.cleanup(
- recipientNode,
- tablePartitionIds,
- txId,
- commit,
- request.commitTimestamp()
- )
- )
- );
+ CompletableFuture<?> changeStateFuture = finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId);
+
+ CompletableFuture<?>[] futures = enlistedPartitions.stream()
+ .map(partitionId -> changeStateFuture.thenCompose(ignored ->
+ cleanupWithRetry(commit, commitTimestamp, txId, partitionId, ATTEMPTS_TO_CLEANUP_REPLICA)))
+ .toArray(size -> new CompletableFuture<?>[size]);
+
+ return allOf(futures);
+ }
+
+ private CompletableFuture<Void> cleanupWithRetry(
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId,
+ TablePartitionId partitionId,
+ int attempts) {
+ HybridTimestamp now = hybridClock.now();
+
+ return findPrimaryReplica(partitionId, now)
+ .thenCompose(leaseHolder ->
+ cleanupWithRetryOnReplica(commit, commitTimestamp, txId, partitionId, leaseHolder, attempts));
+ }
- return allOf(cleanupFutures);
+
+ private CompletableFuture<Void> cleanupWithRetryOnReplica(
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId,
+ TablePartitionId partitionId,
+ String primaryConsistentId,
+ int attempts) {
+ return txManager.cleanup(primaryConsistentId, partitionId, txId, commit, commitTimestamp)
+ .handle((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Failed to perform cleanup on Tx {}." + (attempts > 0 ? " The operation will be retried." : ""), txId, ex);
+
+ if (attempts > 0) {
+ return cleanupWithRetry(commit, commitTimestamp, txId, partitionId, attempts - 1);
+ }
+
+ return CompletableFuture.<Void>failedFuture(ex);
+ }
+
+ return CompletableFuture.<Void>completedFuture(null);
+ })
+ .thenCompose(Function.identity());
+ }
+
+ private CompletableFuture<String> findPrimaryReplica(TablePartitionId partitionId, HybridTimestamp now) {
+ return placementDriver.awaitPrimaryReplica(partitionId, now)
+ .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS)
+ .handle((primaryReplica, e) -> {
+ if (e != null) {
+ LOG.error("Failed to retrieve primary replica for partition {}", partitionId, e);
+
+ throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR,
+ "Failed to get the primary replica"
+ + " [tablePartitionId=" + partitionId + ", awaitTimestamp=" + now + ']', e);
+ }
+
+ return primaryReplica.getLeaseholder();
+ });
}
/**
@@ -1233,7 +1282,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Future to wait of the finish.
*/
private CompletableFuture<Object> finishTransaction(
- List<TablePartitionId> aggregatedGroupIds,
+ Collection<TablePartitionId> aggregatedGroupIds,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -2416,10 +2465,6 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof TxFinishReplicaRequest) {
expectedTerm = ((TxFinishReplicaRequest) request).term();
- assert expectedTerm != null;
- } else if (request instanceof TxCleanupReplicaRequest) {
- expectedTerm = ((TxCleanupReplicaRequest) request).term();
-
assert expectedTerm != null;
} else {
expectedTerm = null;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 327457633a..9b37043388 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed.replicator;
import static java.util.stream.Collectors.toSet;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -58,7 +59,7 @@ class SchemaCompatValidator {
*/
CompletableFuture<CompatValidationResult> validateForward(
UUID txId,
- List<TablePartitionId> enlistedGroupIds,
+ Collection<TablePartitionId> enlistedGroupIds,
@Nullable HybridTimestamp commitTimestamp
) {
HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index d36b9d9349..ed114e14d4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -292,13 +292,13 @@ public class RepeatedFinishReadWriteTransactionTest extends BaseIgniteAbstractTe
@Override
public CompletableFuture<Void> cleanup(
- ClusterNode recipientNode,
- List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+ String primaryConsistentId,
+ TablePartitionId tablePartitionId,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
- return null;
+ return completedFuture(null);
}
@Override
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java
new file mode 100644
index 0000000000..6e8e36aae2
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalCleanupRecoveryTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
+import org.apache.ignite.tx.TransactionException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+
+/**
+ * Durable cleanup test with successfull recovery after the fauilures.
+ */
+public class TxLocalCleanupRecoveryTest extends TxLocalTest {
+
+ private AtomicInteger failureCounter;
+
+ @BeforeEach
+ void initTest() {
+ // The value of 3 is less than the allowed number of cleanup retries.
+ failureCounter = new AtomicInteger(3);
+ }
+
+
+ @Override
+ protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) {
+ ReplicaRequest request = invocationOnMock.getArgument(1);
+
+ if (request instanceof TxCleanupReplicaRequest && failureCounter.getAndDecrement() > 0) {
+ return new MethodAnswer(CompletableFuture.failedFuture(new ReplicationException(
+ REPLICA_COMMON_ERR,
+ "Test Tx Cleanup exception [replicaGroupId=" + request.groupId() + ']')));
+ }
+ // Otherwise use the parent stub.
+ return super.invokeOnMessagingMock(invocationOnMock);
+ }
+
+
+ @Test
+ @Override
+ public void testDeleteUpsertCommit() throws TransactionException {
+ // The value of 6 is higher than the default retry count.
+ // So we should give up retrying and crash.
+ failureCounter = new AtomicInteger(6);
+
+ assertThrows(TransactionException.class, () -> deleteUpsert().commit());
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 149f965642..bc69c327d2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -50,9 +50,11 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.Table;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.mockito.invocation.InvocationOnMock;
/**
* Local table tests.
@@ -81,6 +83,12 @@ public class TxLocalTest extends TxAbstractTest {
Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>();
doAnswer(invocationOnMock -> {
+ MethodAnswer answer = invokeOnMessagingMock(invocationOnMock);
+
+ if (answer.hasAnswer) {
+ return answer.answer;
+ }
+
ReplicaRequest request = invocationOnMock.getArgument(1);
ReplicaListener replicaListener = tables.get(request.groupId()).getReplicaListener();
@@ -151,6 +159,10 @@ public class TxLocalTest extends TxAbstractTest {
tables.put(table2.groupId(), table2);
}
+ protected MethodAnswer invokeOnMessagingMock(InvocationOnMock invocationOnMock) {
+ return MethodAnswer.NO_ANSWER;
+ }
+
@AfterEach
public void tearDown() throws Exception {
if (txManager != null) {
@@ -188,4 +200,26 @@ public class TxLocalTest extends TxAbstractTest {
protected Collection<TxManager> txManagers() {
return List.of(txManager);
}
+
+ /**
+ * A class that represents the result of method execution.
+ */
+ protected static class MethodAnswer {
+ static final MethodAnswer NO_ANSWER = new MethodAnswer(null, false);
+
+ /** The result of method invocation. */
+ final @Nullable Object answer;
+
+ /** If {@code true} the method was called. */
+ final boolean hasAnswer;
+
+ public MethodAnswer(@Nullable Object answer, boolean hasAnswer) {
+ this.answer = answer;
+ this.hasAnswer = hasAnswer;
+ }
+
+ public MethodAnswer(@Nullable Object answer) {
+ this(answer, true);
+ }
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index a921284989..abc281d7a4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -1276,7 +1276,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.txId(txId)
.commit(true)
.commitTimestampLong(now.longValue())
- .term(1L)
.build(),
localNode.id()
);
@@ -2025,7 +2024,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.txId(txId)
.commit(true)
.commitTimestampLong(commitTs.longValue())
- .term(1L)
.build(),
localNode.id()
).join();
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 69cc7409ef..d5f9fa98d1 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -161,7 +161,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
- private InternalTransaction deleteUpsert() {
+ protected InternalTransaction deleteUpsert() {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index f7efbf495e..33facb20be 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -257,6 +258,14 @@ public class DummyInternalTableImpl extends InternalTableImpl {
return replicaListener.invoke(invocationOnMock.getArgument(1), node.id());
})
.when(replicaSvc).invoke(any(ClusterNode.class), any());
+
+ lenient()
+ .doAnswer(invocationOnMock -> {
+ String nodeId = invocationOnMock.getArgument(0);
+
+ return replicaListener.invoke(invocationOnMock.getArgument(1), nodeId);
+ })
+ .when(replicaSvc).invoke(anyString(), any());
}
AtomicLong raftIndex = new AtomicLong();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index c0fbd57a02..280cc91e74 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -128,16 +128,16 @@ public interface TxManager extends IgniteComponent {
/**
* Sends cleanup request to the specified primary replica.
*
- * @param recipientNode Primary replica to process given cleanup request.
- * @param tablePartitionIds Table partition ids with raft terms.
+ * @param primaryConsistentId A consistent id of the primary replica node.
+ * @param tablePartitionId Table partition id.
* @param txId Transaction id.
* @param commit {@code True} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
* @return Completable future of Void.
*/
CompletableFuture<Void> cleanup(
- ClusterNode recipientNode,
- List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+ String primaryConsistentId,
+ TablePartitionId tablePartitionId,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e191f8ca2d..f26090ab2f 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -120,22 +120,14 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups = new LinkedHashMap<>();
if (!enlisted.isEmpty()) {
- enlisted.forEach((groupId, groupMeta) -> {
- ClusterNode recipientNode = groupMeta.get1();
+ enlisted.forEach((groupId, groupMeta) ->
+ groups.computeIfAbsent(groupMeta.get1(), clusterNode -> new ArrayList<>())
+ .add(new IgniteBiTuple<>(groupId, groupMeta.get2())));
- if (groups.containsKey(recipientNode)) {
- groups.get(recipientNode).add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
- } else {
- List<IgniteBiTuple<TablePartitionId, Long>> items = new ArrayList<>();
+ IgniteBiTuple<ClusterNode, Long> nodeAndTerm = enlisted.get(commitPart);
- items.add(new IgniteBiTuple<>(groupId, groupMeta.get2()));
-
- groups.put(recipientNode, items);
- }
- });
-
- ClusterNode recipientNode = enlisted.get(commitPart).get1();
- Long term = enlisted.get(commitPart).get2();
+ ClusterNode recipientNode = nodeAndTerm.get1();
+ Long term = nodeAndTerm.get2();
LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, groups={}",
recipientNode, term, commit, id(), groups);
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index ca96d6cb3a..f8adbe49a4 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -294,51 +294,43 @@ public class TxManagerImpl implements TxManager {
.build();
return replicaService.invoke(recipientNode, req)
- .thenRun(() -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
- finishingStateMeta.txFinishFuture().complete(old);
+ .thenRun(() ->
+ updateTxMeta(txId, old -> {
+ if (isFinalState(old.txState())) {
+ finishingStateMeta.txFinishFuture().complete(old);
- return old;
- }
+ return old;
+ }
- assert old instanceof TxStateMetaFinishing;
+ assert old instanceof TxStateMetaFinishing;
- TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp);
+ TxStateMeta finalTxStateMeta = coordinatorFinalTxStateMeta(commit, commitTimestamp);
- finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
+ finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
- return finalTxStateMeta;
- });
- });
+ return finalTxStateMeta;
+ })
+ );
}
@Override
public CompletableFuture<Void> cleanup(
- ClusterNode recipientNode,
- List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+ String primaryConsistentId,
+ TablePartitionId tablePartitionId,
UUID txId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
- var cleanupFutures = new CompletableFuture[tablePartitionIds.size()];
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17582 Grouping replica requests.
- for (int i = 0; i < tablePartitionIds.size(); i++) {
- cleanupFutures[i] = replicaService.invoke(
- recipientNode,
- FACTORY.txCleanupReplicaRequest()
- .groupId(tablePartitionIds.get(i).get1())
- .timestampLong(clock.nowLong())
- .txId(txId)
- .commit(commit)
- .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(tablePartitionIds.get(i).get2())
- .build()
- );
- }
-
- return allOf(cleanupFutures);
+ return replicaService.invoke(
+ primaryConsistentId,
+ FACTORY.txCleanupReplicaRequest()
+ .groupId(tablePartitionId)
+ .timestampLong(clock.nowLong())
+ .txId(txId)
+ .commit(commit)
+ .commitTimestampLong(hybridTimestampToLong(commitTimestamp))
+ .build()
+ );
}
@Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
index d5503e71cf..2032cf1352 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupReplicaRequest.java
@@ -34,7 +34,7 @@ import org.jetbrains.annotations.Nullable;
* <li>Release all locks that were held on local replica by given transaction.</li>
* </ol>
*/
-@Transferable(value = TxMessageGroup.TX_CLEANUP_REQUEST)
+@Transferable(TxMessageGroup.TX_CLEANUP_REQUEST)
public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware {
/**
* Returns transaction Id.
@@ -65,13 +65,4 @@ public interface TxCleanupReplicaRequest extends ReplicaRequest, TimestampAware
default @Nullable HybridTimestamp commitTimestamp() {
return nullableHybridTimestamp(commitTimestampLong());
}
-
- /**
- * Gets a raft term.
- * TODO: A temp solution until lease-based engine will be implemented (IGNITE-17256, IGNITE-15083)
- *
- * @return Raft term.
- */
- @Deprecated
- Long term();
}