You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2024/02/05 08:23:23 UTC
(ignite-3) branch main updated: IGNITE-21371 Add tx coordinator id to TX requests (#3128)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8880aa1522 IGNITE-21371 Add tx coordinator id to TX requests (#3128)
8880aa1522 is described below
commit 8880aa152275f4a4e770efe9b35c8f9c7da6b8d2
Author: Cyrill <cy...@gmail.com>
AuthorDate: Mon Feb 5 11:23:17 2024 +0300
IGNITE-21371 Add tx coordinator id to TX requests (#3128)
---
.../ignite/client/fakes/FakeInternalTable.java | 2 +
.../apache/ignite/client/fakes/FakeTxManager.java | 5 +
.../ItPrimaryReplicaChoiceTest.java | 1 +
.../ignite/internal/table/ItTableScanTest.java | 80 ++++++++++++++--
.../sql/engine/exec/ScannableTableImpl.java | 3 +
.../internal/sql/engine/exec/TxAttributes.java | 22 ++++-
.../sql/engine/exec/UpdatableTableImpl.java | 3 +
.../engine/exec/rel/ScannableTableSelfTest.java | 10 ++
.../sql/engine/framework/NoOpTransaction.java | 5 +
.../ItInternalTableReadWriteScanTest.java | 2 +-
.../ignite/distributed/ItTxStateLocalMapTest.java | 2 -
.../ignite/distributed/ReplicaUnavailableTest.java | 1 +
.../ignite/internal/table/InternalTable.java | 4 +
.../internal/table/distributed/TableManager.java | 12 ++-
.../table/distributed/command/FinishTxCommand.java | 4 -
.../command/WriteIntentSwitchCommand.java | 5 -
.../table/distributed/raft/PartitionListener.java | 8 +-
.../replication/request/CommittableTxRequest.java | 41 --------
.../request/ReadWriteMultiRowPkReplicaRequest.java | 2 +-
.../request/ReadWriteMultiRowReplicaRequest.java | 2 +-
.../request/ReadWriteReplicaRequest.java | 20 ++++
.../ReadWriteScanRetrieveBatchReplicaRequest.java | 3 +-
.../ReadWriteSingleRowPkReplicaRequest.java | 2 +-
.../request/ReadWriteSingleRowReplicaRequest.java | 2 +-
.../request/ReadWriteSwapRowReplicaRequest.java | 2 +-
.../replicator/PartitionReplicaListener.java | 104 ++++++---------------
.../distributed/storage/InternalTableImpl.java | 46 ++++++++-
.../PartitionRaftCommandsSerializationTest.java | 4 -
.../raft/PartitionCommandListenerTest.java | 6 --
.../PartitionReplicaListenerIndexLockingTest.java | 13 ++-
.../replication/PartitionReplicaListenerTest.java | 20 ++++
.../PartitionCommandsMarshallerImplTest.java | 1 -
.../ignite/internal/tx/InternalTransaction.java | 7 ++
.../tx/impl/IgniteAbstractTransactionImpl.java | 19 +++-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 4 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 10 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 4 +-
.../ignite/internal/tx/impl/TxMessageSender.java | 14 +--
.../tx/message/TxFinishReplicaRequest.java | 2 +-
.../tx/impl/ReadOnlyTransactionImplTest.java | 2 +-
.../tx/impl/ReadWriteTransactionImplTest.java | 4 +-
41 files changed, 320 insertions(+), 183 deletions(-)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index ff0804d97f..39731f6565 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -370,6 +370,7 @@ public class FakeInternalTable implements InternalTable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String txCoordinatorId,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -407,6 +408,7 @@ public class FakeInternalTable implements InternalTable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String txCoordinatorId,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index b5f4bb1ffd..aeafe657c6 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -135,6 +135,11 @@ public class FakeTxManager implements TxManager {
return readOnly;
}
+ @Override
+ public String coordinatorId() {
+ return null;
+ }
+
@Override
public HybridTimestamp readTimestamp() {
return tracker.get();
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 7066eaca89..f8d8e7b54f 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -307,6 +307,7 @@ public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest {
PART_ID,
rwTx.id(),
rwTx.commitPartition(),
+ rwTx.coordinatorId(),
new PrimaryReplica(primaryNode, primaryReplicaFut.get().getStartTime().longValue()),
idxId,
exactKey,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 3ca9b399b8..e1b2b18a85 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -188,7 +188,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx1,
- internalTable.scan(PART_ID, tx1.id(), tx1.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx1.id(),
+ tx1.commitPartition(),
+ tx1.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -464,7 +475,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx.id(),
+ tx.commitPartition(),
+ tx.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -492,7 +514,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher1 = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx.id(),
+ tx.commitPartition(),
+ tx.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -524,6 +557,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
PART_ID,
tx.id(),
tx.commitPartition(),
+ tx.coordinatorId(),
recipient,
soredIndexId,
lowBound,
@@ -551,6 +585,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
PART_ID,
tx.id(),
tx.commitPartition(),
+ tx.coordinatorId(),
recipient,
soredIndexId,
lowBound,
@@ -605,7 +640,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx.id(),
+ tx.commitPartition(),
+ tx.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
// Non-thread-safe collection is fine, HB is guaranteed by "Thread#join" inside of "runRace".
@@ -631,7 +677,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher1 = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx.id(),
+ tx.commitPartition(),
+ tx.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -744,7 +801,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest {
publisher = new RollbackTxOnErrorPublisher<>(
tx,
- internalTable.scan(PART_ID, tx.id(), tx.commitPartition(), recipient, sortedIndexId, null, null, 0, null)
+ internalTable.scan(
+ PART_ID,
+ tx.id(),
+ tx.commitPartition(),
+ tx.coordinatorId(),
+ recipient,
+ sortedIndexId,
+ null,
+ null,
+ 0,
+ null
+ )
);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index 9fd9f76e74..c4f4b359db 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -74,6 +74,7 @@ public class ScannableTableImpl implements ScannableTable {
partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
+ txAttributes.coordinatorId(),
recipient,
null,
null,
@@ -140,6 +141,7 @@ public class ScannableTableImpl implements ScannableTable {
partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
+ txAttributes.coordinatorId(),
new PrimaryReplica(ctx.localNode(), partWithConsistencyToken.enlistmentConsistencyToken()),
indexId,
lower,
@@ -192,6 +194,7 @@ public class ScannableTableImpl implements ScannableTable {
partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
+ txAttributes.coordinatorId(),
new PrimaryReplica(ctx.localNode(), partWithConsistencyToken.enlistmentConsistencyToken()),
indexId,
keyTuple,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TxAttributes.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TxAttributes.java
index ec936eac51..0c44a459c1 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TxAttributes.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/TxAttributes.java
@@ -36,6 +36,7 @@ public class TxAttributes implements Serializable {
private static final long serialVersionUID = 3933878724800694086L;
private final UUID id;
+ private final String coordinatorId;
private final boolean readOnly;
private final @Nullable HybridTimestamp readTimestamp;
private final @Nullable TablePartitionId commitPartition;
@@ -55,18 +56,20 @@ public class TxAttributes implements Serializable {
throw new IllegalArgumentException("Read time is not set for RO transaction");
}
- return new TxAttributes(tx.id(), readTime);
+ return new TxAttributes(tx.id(), readTime, tx.coordinatorId());
}
- return new TxAttributes(tx.id(), tx.commitPartition());
+ return new TxAttributes(tx.id(), tx.commitPartition(), tx.coordinatorId());
}
private TxAttributes(
UUID id,
- HybridTimestamp readTimestamp
+ HybridTimestamp readTimestamp,
+ String coordinatorId
) {
this.id = Objects.requireNonNull(id, "id");
this.readTimestamp = Objects.requireNonNull(readTimestamp, "timestamp");
+ this.coordinatorId = Objects.requireNonNull(coordinatorId, "tx coordinator id");
this.readOnly = true;
this.commitPartition = null;
@@ -74,10 +77,12 @@ public class TxAttributes implements Serializable {
private TxAttributes(
UUID id,
- @Nullable TablePartitionId commitPartitionId
+ @Nullable TablePartitionId commitPartitionId,
+ String coordinatorId
) {
this.id = Objects.requireNonNull(id, "id");
this.commitPartition = commitPartitionId;
+ this.coordinatorId = Objects.requireNonNull(coordinatorId, "tx coordinator id");
this.readOnly = false;
this.readTimestamp = null;
@@ -111,6 +116,15 @@ public class TxAttributes implements Serializable {
return readTimestamp;
}
+ /**
+ * Get the transaction coordinator inconsistent ID.
+ *
+ * @return Transaction coordinator inconsistent ID.
+ */
+ public String coordinatorId() {
+ return coordinatorId;
+ }
+
/** Returns {@code true} if this is RO transaction. */
public boolean readOnly() {
return readOnly;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index f1149c3684..58c112d939 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -131,6 +131,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
.requestType(RequestType.RW_UPSERT_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
+ .coordinatorId(txAttributes.coordinatorId())
.build();
futures[batchNum++] = replicaService.invoke(nodeWithConsistencyToken.name(), request);
@@ -203,6 +204,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
.requestType(RequestType.RW_INSERT_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
+ .coordinatorId(txAttributes.coordinatorId())
.build();
rowBatch.resultFuture = replicaService.invoke(nodeWithConsistencyToken.name(), request);
@@ -270,6 +272,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
.requestType(RequestType.RW_DELETE_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
+ .coordinatorId(txAttributes.coordinatorId())
.build();
futures[batchNum++] = replicaService.invoke(nodeWithConsistencyToken.name(), request);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 8007e40503..658689c7e9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.nullable;
@@ -134,6 +135,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
partitionId,
tx.id(),
tx.commitPartition(),
+ tx.coordinatorId(),
new PrimaryReplica(clusterNode, consistencyToken),
null,
null,
@@ -220,6 +222,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
eq(partitionId),
eq(tx.id()),
eq(tx.commitPartition()),
+ anyString(),
eq(primaryReplica),
eq(indexId),
condition.lowerValue != null ? any(BinaryTuplePrefix.class) : isNull(),
@@ -293,6 +296,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
eq(partitionId),
eq(tx.id()),
eq(tx.commitPartition()),
+ anyString(),
eq(primaryReplica),
eq(indexId),
nullable(BinaryTuplePrefix.class),
@@ -412,6 +416,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
eq(partitionId),
eq(tx.id()),
eq(tx.commitPartition()),
+ anyString(),
eq(primaryReplica),
eq(indexId),
prefix.capture(),
@@ -464,6 +469,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
eq(partitionId),
eq(tx.id()),
any(),
+ anyString(),
eq(primaryReplica),
eq(indexId),
any(BinaryTuple.class),
@@ -514,6 +520,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
eq(partitionId),
eq(tx.id()),
any(),
+ anyString(),
eq(primaryReplica),
eq(indexId),
any(BinaryTuple.class),
@@ -590,6 +597,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
anyInt(),
any(UUID.class),
any(TablePartitionId.class),
+ any(String.class),
any(PrimaryReplica.class),
isNull(),
isNull(),
@@ -636,6 +644,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
anyInt(),
any(UUID.class),
any(TablePartitionId.class),
+ any(String.class),
any(PrimaryReplica.class),
any(Integer.class),
nullable(BinaryTuplePrefix.class),
@@ -681,6 +690,7 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest {
anyInt(),
any(UUID.class),
any(TablePartitionId.class),
+ any(String.class),
any(PrimaryReplica.class),
any(Integer.class),
nullable(BinaryTuple.class),
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 16432d5a31..4ad153c175 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -130,6 +130,11 @@ public final class NoOpTransaction implements InternalTransaction {
return id;
}
+ @Override
+ public String coordinatorId() {
+ return clusterNode().id();
+ }
+
@Override
public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) {
return tuple;
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 1acf55c319..4aac403106 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -50,7 +50,7 @@ public class ItInternalTableReadWriteScanTest extends ItAbstractInternalTableSca
return new RollbackTxOnErrorPublisher<>(
tx,
- internalTbl.scan(part, tx.id(), tx.commitPartition(), recipient, null, null, null, 0, null)
+ internalTbl.scan(part, tx.id(), tx.commitPartition(), tx.coordinatorId(), recipient, null, null, null, 0, null)
);
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 28825e1eaa..36384aea0e 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -25,7 +25,6 @@ import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
-import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -188,7 +187,6 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest {
}
return (expected.txState() == meta.get().txState()
- && Objects.equals(expected.txCoordinatorId(), meta.get().txCoordinatorId())
&& checkTimestamps(expected.commitTimestamp(), meta.get().commitTimestamp()));
}, 5_000));
} catch (InterruptedException e) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 9cc1d4266a..8a55df9419 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -207,6 +207,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.binaryTuple(binaryRow.tupleSlice())
.requestType(RequestType.RW_GET)
.enlistmentConsistencyToken(1L)
+ .coordinatorId(clusterService.topologyService().localMember().id())
.build();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index eae46725ee..ad5bc18e0a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -340,6 +340,7 @@ public interface InternalTable extends ManuallyCloseable {
* @param partId The partition.
* @param txId Transaction id.
* @param commitPartition Commit partition id.
+ * @param txCoordinatorId Transaction coordinator id.
* @param recipient Primary replica that will handle given get request.
* @param lowerBound Lower search bound.
* @param upperBound Upper search bound.
@@ -351,6 +352,7 @@ public interface InternalTable extends ManuallyCloseable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String txCoordinatorId,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -408,6 +410,7 @@ public interface InternalTable extends ManuallyCloseable {
* @param partId The partition.
* @param txId Transaction id.
* @param commitPartition Commit partition id.
+ * @param txCoordinatorId Transaction coordinator id.
* @param recipient Primary replica that will handle given get request.
* @param indexId Index id.
* @param key Key to search.
@@ -418,6 +421,7 @@ public interface InternalTable extends ManuallyCloseable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String txCoordinatorId,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index dc292c6898..b98720a63d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1219,8 +1219,16 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
tableName,
tableId,
new Int2ObjectOpenHashMap<>(partitions),
- partitions, clusterService.topologyService(), txManager, tableStorage,
- txStateStorage, replicaSvc, clock, observableTimestampTracker, placementDriver);
+ partitions,
+ clusterService.topologyService(),
+ txManager,
+ tableStorage,
+ txStateStorage,
+ replicaSvc,
+ clock,
+ observableTimestampTracker,
+ placementDriver
+ );
var table = new TableImpl(internalTable, lockMgr, schemaVersions, sql.get());
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
index 32dc895cd4..4a631027c0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/FinishTxCommand.java
@@ -52,8 +52,4 @@ public interface FinishTxCommand extends PartitionCommand {
*/
List<TablePartitionIdMessage> tablePartitionIds();
- /**
- * Transaction coordinator id.
- */
- String txCoordinatorId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
index cc01cf3179..6748f32342 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/WriteIntentSwitchCommand.java
@@ -45,9 +45,4 @@ public interface WriteIntentSwitchCommand extends PartitionCommand {
default @Nullable HybridTimestamp commitTimestamp() {
return nullableHybridTimestamp(commitTimestampLong());
}
-
- /**
- * Transaction coordinator id.
- */
- String txCoordinatorId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 6d378d8caf..a62550a338 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -348,7 +348,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler
commandTerm
);
- markFinished(txId, cmd.commit(), cmd.commitTimestamp(), cmd.txCoordinatorId());
+ markFinished(txId, cmd.commit(), cmd.commitTimestamp());
LOG.debug("Finish the transaction txId = {}, state = {}, txStateChangeRes = {}", txId, txMetaToSet, txStateChangeRes);
@@ -375,7 +375,7 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler
UUID txId = cmd.txId();
- markFinished(txId, cmd.commit(), cmd.commitTimestamp(), cmd.txCoordinatorId());
+ markFinished(txId, cmd.commit(), cmd.commitTimestamp());
storageUpdateHandler.switchWriteIntents(txId, cmd.commit(), cmd.commitTimestamp(),
() -> storage.lastApplied(commandIndex, commandTerm));
@@ -596,10 +596,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler
));
}
- private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, String txCoordinatorId) {
+ private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
txManager.updateTxMeta(txId, old -> new TxStateMeta(
commit ? COMMITTED : ABORTED,
- txCoordinatorId,
+ old == null ? null : old.txCoordinatorId(),
old == null ? null : old.commitPartitionId(),
commit ? commitTimestamp : null
));
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
deleted file mode 100644
index c90badb72b..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/CommittableTxRequest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.replication.request;
-
-import java.util.UUID;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
-import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
-
-/**
- * Transaction request that can contain full transaction (transaction that contains full set of keys).
- */
-public interface CommittableTxRequest extends ReplicaRequest {
- UUID transactionId();
-
- /**
- * Return {@code true} if this is a full transaction.
- */
- boolean full();
-
- /**
- * Gets a commit partition id.
- *
- * @return Table partition id.
- */
- TablePartitionIdMessage commitPartitionId();
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
index 7d738ad9e0..2d12a888aa 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowPkReplicaRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Read-write multi-row replica request involving table's Primary Keys.
*/
@Transferable(TableMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteMultiRowPkReplicaRequest extends MultipleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
+public interface ReadWriteMultiRowPkReplicaRequest extends MultipleRowPkReplicaRequest, ReadWriteReplicaRequest {
/**
* Disable delayed ack optimization.
*
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
index a94643776e..f2476ca46b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteMultiRowReplicaRequest.java
@@ -24,7 +24,7 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Read-write multi-row replica request.
*/
@Transferable(TableMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST)
-public interface ReadWriteMultiRowReplicaRequest extends MultipleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
+public interface ReadWriteMultiRowReplicaRequest extends MultipleRowReplicaRequest, ReadWriteReplicaRequest {
/**
* Disable delayed ack optimization.
*
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
index 92e28313dd..34a679be1a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
@@ -20,8 +20,28 @@ package org.apache.ignite.internal.table.distributed.replication.request;
import java.util.UUID;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
/** Read-write replica request. */
public interface ReadWriteReplicaRequest extends PrimaryReplicaRequest, TimestampAware {
UUID transactionId();
+
+ /**
+ * Get the transaction coordinator inconsistent ID.
+ *
+ * @return Transaction coordinator inconsistent ID.
+ */
+ String coordinatorId();
+
+ /**
+ * Return {@code true} if this is a full transaction.
+ */
+ boolean full();
+
+ /**
+ * Gets a commit partition id.
+ *
+ * @return Table partition id.
+ */
+ TablePartitionIdMessage commitPartitionId();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
index 7a2061a815..a0a9517bbf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteScanRetrieveBatchReplicaRequest.java
@@ -24,6 +24,5 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Scan retrieve batch replica request.
*/
@Transferable(TableMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST)
-public interface ReadWriteScanRetrieveBatchReplicaRequest extends ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest,
- CommittableTxRequest {
+public interface ReadWriteScanRetrieveBatchReplicaRequest extends ScanRetrieveBatchReplicaRequest, ReadWriteReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
index 34185e748b..106d9ca720 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowPkReplicaRequest.java
@@ -24,5 +24,5 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Read-write single-row replica request involving a table's Primary Key..
*/
@Transferable(TableMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST)
-public interface ReadWriteSingleRowPkReplicaRequest extends SingleRowPkReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
+public interface ReadWriteSingleRowPkReplicaRequest extends SingleRowPkReplicaRequest, ReadWriteReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
index d727f9b1eb..2a935be57f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSingleRowReplicaRequest.java
@@ -24,5 +24,5 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Read-write single-row replica request.
*/
@Transferable(TableMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST)
-public interface ReadWriteSingleRowReplicaRequest extends SingleRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
+public interface ReadWriteSingleRowReplicaRequest extends SingleRowReplicaRequest, ReadWriteReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
index 2845395c73..1553635a45 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteSwapRowReplicaRequest.java
@@ -24,6 +24,6 @@ import org.apache.ignite.internal.table.distributed.TableMessageGroup;
* Read-write dual row replica request.
*/
@Transferable(TableMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST)
-public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, ReadWriteReplicaRequest, CommittableTxRequest {
+public interface ReadWriteSwapRowReplicaRequest extends SwapRowReplicaRequest, ReadWriteReplicaRequest {
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index e663896260..728268da12 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -138,7 +138,6 @@ import org.apache.ignite.internal.table.distributed.raft.UnexpectedTransactionSt
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
-import org.apache.ignite.internal.table.distributed.replication.request.CommittableTxRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
@@ -451,14 +450,14 @@ public class PartitionReplicaListener implements ReplicaListener {
assert ((SchemaVersionAwareReplicaRequest) request).schemaVersion() > 0 : "No schema version passed?";
}
- if (request instanceof CommittableTxRequest) {
- var req = (CommittableTxRequest) request;
+ if (request instanceof ReadWriteReplicaRequest) {
+ var req = (ReadWriteReplicaRequest) request;
// Saving state is not needed for full transactions.
if (!req.full()) {
txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
PENDING,
- senderId,
+ req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null
));
@@ -474,7 +473,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return validateTableExistence(request, opTsIfDirectRo)
.thenCompose(unused -> validateSchemaMatch(request, opTsIfDirectRo))
.thenCompose(unused -> waitForSchemasBeforeReading(request, opTsIfDirectRo))
- .thenCompose(unused -> processOperationRequestWithTxRwCounter(request, isPrimary, senderId, opTsIfDirectRo));
+ .thenCompose(unused -> processOperationRequestWithTxRwCounter(request, isPrimary, opTsIfDirectRo));
}
/**
@@ -658,29 +657,28 @@ public class PartitionReplicaListener implements ReplicaListener {
private CompletableFuture<?> processOperationRequest(
ReplicaRequest request,
@Nullable Boolean isPrimary,
- String senderId,
@Nullable HybridTimestamp opStartTsIfDirectRo
) {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processSingleEntryAction(req, senderId));
+ return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
var req = (ReadWriteSingleRowPkReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processSingleEntryAction(req, senderId));
+ return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processMultiEntryAction(req, senderId));
+ return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
var req = (ReadWriteMultiRowPkReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processMultiEntryAction(req, senderId));
+ return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processTwoEntriesAction(req, senderId));
+ return appendTxCommand(req.transactionId(), req.requestType(), req.full(), () -> processTwoEntriesAction(req));
} else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
@@ -691,13 +689,13 @@ public class PartitionReplicaListener implements ReplicaListener {
// If they don't fit the bucket, the transaction is treated as 2pc.
txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta(
PENDING,
- senderId,
+ req.coordinatorId(),
req.commitPartitionId().asTablePartitionId(),
null
));
// Implicit RW scan can be committed locally on a last batch or error.
- return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req, senderId))
+ return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
@@ -722,7 +720,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return nullCompletedFuture();
} else if (request instanceof TxFinishReplicaRequest) {
- return processTxFinishAction((TxFinishReplicaRequest) request, senderId);
+ return processTxFinishAction((TxFinishReplicaRequest) request);
} else if (request instanceof WriteIntentSwitchReplicaRequest) {
return processWriteIntentSwitchAction((WriteIntentSwitchReplicaRequest) request);
} else if (request instanceof ReadOnlySingleRowPkReplicaRequest) {
@@ -1157,8 +1155,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Listener response.
*/
private CompletableFuture<List<BinaryRow>> processScanRetrieveBatchAction(
- ReadWriteScanRetrieveBatchReplicaRequest request,
- String txCoordinatorId
+ ReadWriteScanRetrieveBatchReplicaRequest request
) {
if (request.indexToUse() != null) {
TableSchemaAwareIndexStorage indexStorage = secondaryIndexStorages.get().get(request.indexToUse());
@@ -1549,11 +1546,10 @@ public class PartitionReplicaListener implements ReplicaListener {
* </ol>
*
* @param request Transaction finish request.
- * @param txCoordinatorId Transaction coordinator id.
* @return future result of the operation.
*/
// TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
- private CompletableFuture<TransactionResult> processTxFinishAction(TxFinishReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<TransactionResult> processTxFinishAction(TxFinishReplicaRequest request) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use ZonePartitionIdMessage and remove cast
Collection<TablePartitionId> enlistedGroups = (Collection<TablePartitionId>) (Collection<?>) request.groups();
@@ -1568,15 +1564,14 @@ public class PartitionReplicaListener implements ReplicaListener {
enlistedGroups,
validationResult.isSuccessful(),
validationResult.isSuccessful() ? commitTimestamp : null,
- txId,
- txCoordinatorId
+ txId
).thenApply(txResult -> {
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
return txResult;
}));
} else {
// Aborting.
- return finishAndCleanup(enlistedGroups, false, null, txId, txCoordinatorId);
+ return finishAndCleanup(enlistedGroups, false, null, txId);
}
}
@@ -1604,8 +1599,7 @@ public class PartitionReplicaListener implements ReplicaListener {
Collection<TablePartitionId> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
- UUID txId,
- String txCoordinatorId
+ UUID txId
) {
// Read TX state from the storage, we will need this state to check if the locks are released.
// Since this state is written only on the transaction finish (see PartitionListener.handleFinishTxCommand),
@@ -1653,7 +1647,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new TransactionResult(txMeta.txState(), txMeta.commitTimestamp()));
}
- return finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, txCoordinatorId)
+ return finishTransaction(enlistedPartitions, txId, commit, commitTimestamp)
.thenCompose(txResult ->
txManager.cleanup(enlistedPartitions, commit, commitTimestamp, txId)
.thenApply(v -> txResult)
@@ -1667,15 +1661,13 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txId Transaction id.
* @param commit True is the transaction is committed, false otherwise.
* @param commitTimestamp Commit timestamp, if applicable.
- * @param txCoordinatorId Transaction coordinator id.
* @return Future to wait of the finish.
*/
private CompletableFuture<TransactionResult> finishTransaction(
Collection<TablePartitionId> aggregatedGroupIds,
UUID txId,
boolean commit,
- @Nullable HybridTimestamp commitTimestamp,
- String txCoordinatorId
+ @Nullable HybridTimestamp commitTimestamp
) {
assert !(commit && commitTimestamp == null) : "Cannot commit without the timestamp.";
@@ -1686,7 +1678,6 @@ public class PartitionReplicaListener implements ReplicaListener {
txId,
commit,
commitTimestamp,
- txCoordinatorId,
catalogVersion,
aggregatedGroupIds.stream()
.map(PartitionReplicaListener::tablePartitionId)
@@ -1721,7 +1712,6 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID transactionId,
boolean commit,
HybridTimestamp commitTimestamp,
- String txCoordinatorId,
int catalogVersion,
List<TablePartitionIdMessage> tablePartitionIds
) {
@@ -1730,7 +1720,6 @@ public class PartitionReplicaListener implements ReplicaListener {
.txId(transactionId)
.commit(commit)
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(txCoordinatorId)
.requiredCatalogVersion(catalogVersion)
.tablePartitionIds(tablePartitionIds);
@@ -1833,7 +1822,6 @@ public class PartitionReplicaListener implements ReplicaListener {
.commit(commit)
.commitTimestampLong(commitTimestampLong)
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(getTxCoordinatorId(transactionId))
.requiredCatalogVersion(catalogVersion)
.build();
@@ -1852,14 +1840,6 @@ public class PartitionReplicaListener implements ReplicaListener {
.thenApply(res -> null);
}
- private @Nullable String getTxCoordinatorId(UUID txId) {
- TxStateMeta meta = txManager.stateMeta(txId);
-
- assert meta != null : "Trying to cleanup a transaction that was not enlisted, txId=" + txId;
-
- return meta.txCoordinatorId();
- }
-
/**
* Creates a future that waits all transaction operations are completed.
*
@@ -2155,10 +2135,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Precesses multi request.
*
* @param request Multi request operation.
- * @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<ReplicaResult> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
UUID txId = request.transactionId();
TablePartitionId commitPartitionId = request.commitPartitionId().asTablePartitionId();
List<BinaryRow> searchRows = request.binaryRows();
@@ -2219,7 +2198,6 @@ public class PartitionReplicaListener implements ReplicaListener {
catalogVersion -> applyUpdateAllCommand(
request,
rowIdsToDelete,
- txCoordinatorId,
catalogVersion
)
)
@@ -2286,7 +2264,6 @@ public class PartitionReplicaListener implements ReplicaListener {
.thenCompose(catalogVersion -> applyUpdateAllCommand(
request,
convertedMap,
- txCoordinatorId,
catalogVersion
)
)
@@ -2350,7 +2327,6 @@ public class PartitionReplicaListener implements ReplicaListener {
catalogVersion -> applyUpdateAllCommand(
request,
rowsToUpdate,
- txCoordinatorId,
catalogVersion
)
)
@@ -2376,10 +2352,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Precesses multi request.
*
* @param request Multi request operation.
- * @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request) {
UUID txId = request.transactionId();
TablePartitionId committedPartitionId = request.commitPartitionId().asTablePartitionId();
List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
@@ -2471,7 +2446,7 @@ public class PartitionReplicaListener implements ReplicaListener {
request.commitPartitionId(),
request.transactionId(),
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion,
request.skipDelayedAck()
)
@@ -2674,7 +2649,6 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param rowUuid Row UUID.
* @param row Row.
* @param lastCommitTimestamp The timestamp of the last committed entry for the row.
- * @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given operation.
* @return A local update ready future, possibly having a nested replication future as a result for delayed ack purpose.
*/
@@ -2683,7 +2657,6 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID rowUuid,
@Nullable BinaryRow row,
@Nullable HybridTimestamp lastCommitTimestamp,
- String txCoordinatorId,
int catalogVersion
) {
return applyUpdateCommand(
@@ -2693,7 +2666,7 @@ public class PartitionReplicaListener implements ReplicaListener {
lastCommitTimestamp,
request.transactionId(),
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion
);
}
@@ -2807,14 +2780,12 @@ public class PartitionReplicaListener implements ReplicaListener {
*
* @param request Read write multi rows replica request.
* @param rowsToUpdate All {@link BinaryRow}s represented as {@link TimedBinaryRowMessage}s to be updated.
- * @param txCoordinatorId Transaction coordinator id.
* @param catalogVersion Validated catalog version associated with given operation.
* @return Raft future, see {@link #applyCmdWithExceptionHandling(Command, CompletableFuture)}.
*/
private CompletableFuture<CompletableFuture<?>> applyUpdateAllCommand(
ReadWriteMultiRowReplicaRequest request,
Map<UUID, TimedBinaryRowMessage> rowsToUpdate,
- String txCoordinatorId,
int catalogVersion
) {
return applyUpdateAllCommand(
@@ -2822,7 +2793,7 @@ public class PartitionReplicaListener implements ReplicaListener {
request.commitPartitionId(),
request.transactionId(),
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion,
request.skipDelayedAck()
);
@@ -2854,10 +2825,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Precesses single request.
*
* @param request Single request operation.
- * @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowReplicaRequest request) {
UUID txId = request.transactionId();
BinaryRow searchRow = request.binaryRow();
TablePartitionId commitPartitionId = request.commitPartitionId().asTablePartitionId();
@@ -2885,7 +2855,6 @@ public class PartitionReplicaListener implements ReplicaListener {
validatedRowId.uuid(),
null,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -2909,7 +2878,6 @@ public class PartitionReplicaListener implements ReplicaListener {
rowId0.uuid(),
searchRow,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -2941,7 +2909,6 @@ public class PartitionReplicaListener implements ReplicaListener {
rowId0.uuid(),
searchRow,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -2973,7 +2940,6 @@ public class PartitionReplicaListener implements ReplicaListener {
rowId0.uuid(),
searchRow,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -3001,7 +2967,6 @@ public class PartitionReplicaListener implements ReplicaListener {
rowId.uuid(),
searchRow,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -3029,7 +2994,6 @@ public class PartitionReplicaListener implements ReplicaListener {
rowId.uuid(),
searchRow,
lastCommitTime,
- txCoordinatorId,
catalogVersion
)
)
@@ -3053,10 +3017,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Precesses single request.
*
* @param request Single request operation.
- * @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<ReplicaResult> processSingleEntryAction(ReadWriteSingleRowPkReplicaRequest request) {
UUID txId = request.transactionId();
BinaryTuple primaryKey = resolvePk(request.primaryKey());
TablePartitionId commitPartitionId = request.commitPartitionId().asTablePartitionId();
@@ -3093,7 +3056,7 @@ public class PartitionReplicaListener implements ReplicaListener {
lastCommitTime,
request.transactionId(),
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion
)
)
@@ -3117,7 +3080,7 @@ public class PartitionReplicaListener implements ReplicaListener {
lastCommitTime,
request.transactionId(),
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion
)
)
@@ -3324,13 +3287,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Precesses two actions.
*
* @param request Two actions operation request.
- * @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<ReplicaResult> processTwoEntriesAction(
- ReadWriteSwapRowReplicaRequest request,
- String txCoordinatorId
- ) {
+ private CompletableFuture<ReplicaResult> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
BinaryRow newRow = request.newBinaryRow();
BinaryRow expectedRow = request.oldBinaryRow();
TablePartitionIdMessage commitPartitionId = request.commitPartitionId();
@@ -3361,7 +3320,7 @@ public class PartitionReplicaListener implements ReplicaListener {
lastCommitTime,
txId,
request.full(),
- txCoordinatorId,
+ request.coordinatorId(),
catalogVersion
)
)
@@ -3835,7 +3794,6 @@ public class PartitionReplicaListener implements ReplicaListener {
private CompletableFuture<?> processOperationRequestWithTxRwCounter(
ReplicaRequest request,
@Nullable Boolean isPrimary,
- String senderId,
@Nullable HybridTimestamp opStartTsIfDirectRo
) {
if (request instanceof ReadWriteReplicaRequest) {
@@ -3844,7 +3802,7 @@ public class PartitionReplicaListener implements ReplicaListener {
txRwOperationTracker.incrementOperationCount(rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request));
}
- return processOperationRequest(request, isPrimary, senderId, opStartTsIfDirectRo)
+ return processOperationRequest(request, isPrimary, opStartTsIfDirectRo)
.whenComplete((unused, throwable) -> {
if (request instanceof ReadWriteReplicaRequest) {
txRwOperationTracker.decrementOperationCount(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index f34f3dfed5..01e463165c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -184,6 +184,7 @@ public class InternalTableImpl implements InternalTable {
* @param tableId Table id.
* @param partMap Map partition id to raft group.
* @param partitions Partitions.
+ * @param clusterNodeResolver Cluster node resolver.
* @param txManager Transaction manager.
* @param tableStorage Table storage.
* @param txStateStorage Transaction state storage.
@@ -453,6 +454,7 @@ public class InternalTableImpl implements InternalTable {
.batchSize(batchSize)
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+ .coordinatorId(tx.coordinatorId())
.build();
if (primaryReplicaAndConsistencyToken != null) {
@@ -785,6 +787,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RW_GET)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> false,
false
@@ -916,6 +919,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(requestType)
.timestampLong(clock.nowLong())
.full(full)
+ .coordinatorId(tx.coordinatorId())
.build();
}
@@ -982,6 +986,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_UPSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> false,
false
@@ -1035,6 +1040,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_GET_AND_UPSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> false,
false
@@ -1057,6 +1063,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_INSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> !res,
false
@@ -1106,6 +1113,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(requestType)
.timestampLong(clock.nowLong())
.full(full)
+ .coordinatorId(tx.coordinatorId())
.build();
}
@@ -1125,6 +1133,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_REPLACE_IF_EXIST)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> !res,
false
@@ -1151,6 +1160,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_REPLACE)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> !res,
false
@@ -1173,6 +1183,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_GET_AND_REPLACE)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> res == null,
false
@@ -1195,6 +1206,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_DELETE)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> !res,
false
@@ -1217,6 +1229,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_DELETE_EXACT)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> !res,
false
@@ -1239,6 +1252,7 @@ public class InternalTableImpl implements InternalTable {
.requestType(RequestType.RW_GET_AND_DELETE)
.timestampLong(clock.nowLong())
.full(tx == null)
+ .coordinatorId(txo.coordinatorId())
.build(),
(res, req) -> res == null,
false
@@ -1318,12 +1332,25 @@ public class InternalTableImpl implements InternalTable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String coordinatorId,
PrimaryReplica recipient,
int indexId,
BinaryTuple key,
@Nullable BitSet columnsToInclude
) {
- return scan(partId, txId, commitPartition, recipient, indexId, key, null, null, 0, columnsToInclude);
+ return scan(
+ partId,
+ txId,
+ commitPartition,
+ coordinatorId,
+ recipient,
+ indexId,
+ key,
+ null,
+ null,
+ 0,
+ columnsToInclude
+ );
}
@Override
@@ -1461,6 +1488,7 @@ public class InternalTableImpl implements InternalTable {
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String coordinatorId,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuplePrefix lowerBound,
@@ -1468,13 +1496,26 @@ public class InternalTableImpl implements InternalTable {
int flags,
@Nullable BitSet columnsToInclude
) {
- return scan(partId, txId, commitPartition, recipient, indexId, null, lowerBound, upperBound, flags, columnsToInclude);
+ return scan(
+ partId,
+ txId,
+ commitPartition,
+ coordinatorId,
+ recipient,
+ indexId,
+ null,
+ lowerBound,
+ upperBound,
+ flags,
+ columnsToInclude
+ );
}
private Publisher<BinaryRow> scan(
int partId,
UUID txId,
TablePartitionId commitPartition,
+ String coordinatorId,
PrimaryReplica recipient,
@Nullable Integer indexId,
@Nullable BinaryTuple exactKey,
@@ -1502,6 +1543,7 @@ public class InternalTableImpl implements InternalTable {
.enlistmentConsistencyToken(recipient.enlistmentConsistencyToken())
.full(false) // Set explicitly.
.commitPartitionId(serializeTablePartitionId(commitPartition))
+ .coordinatorId(coordinatorId)
.build();
return replicaSvc.invoke(recipient.node(), request);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index ac7b8db3f8..6991fc62e8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -200,7 +200,6 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.txId(UUID.randomUUID())
.commit(true)
.commitTimestampLong(clock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build();
WriteIntentSwitchCommand readCmd = copyCommand(cmd);
@@ -227,7 +226,6 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.commit(true)
.commitTimestampLong(clock.nowLong())
.tablePartitionIds(grps)
- .txCoordinatorId(UUID.randomUUID().toString())
.build();
FinishTxCommand readCmd = copyCommand(cmd);
@@ -249,7 +247,6 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.commit(finishTxCommand.commit())
.tablePartitionIds(finishTxCommand.tablePartitionIds())
.commitTimestampLong(finishTxCommand.commitTimestampLong())
- .txCoordinatorId(finishTxCommand.txCoordinatorId())
.build();
} else if (cmd instanceof WriteIntentSwitchCommand) {
WriteIntentSwitchCommand writeIntentSwitchCommand = (WriteIntentSwitchCommand) cmd;
@@ -258,7 +255,6 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.txId(writeIntentSwitchCommand.txId())
.commit(writeIntentSwitchCommand.commit())
.commitTimestampLong(writeIntentSwitchCommand.commitTimestampLong())
- .txCoordinatorId(writeIntentSwitchCommand.txCoordinatorId())
.build();
} else if (cmd instanceof UpdateCommand) {
UpdateCommand updateCommand = (UpdateCommand) cmd;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index e729c1288d..31b4051ef7 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -643,7 +643,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp.longValue())
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build());
}
@@ -686,7 +685,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp.longValue())
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build());
}
@@ -724,7 +722,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp.longValue())
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build());
}
@@ -773,7 +770,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp.longValue())
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build()));
}
@@ -816,7 +812,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp.longValue())
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build()));
}
@@ -893,7 +888,6 @@ public class PartitionCommandListenerTest extends BaseIgniteAbstractTest {
.commit(true)
.commitTimestampLong(commitTimestamp)
.safeTimeLong(hybridClock.nowLong())
- .txCoordinatorId(UUID.randomUUID().toString())
.build()));
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 7e1c403ff3..0d3f9e92db 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -213,10 +213,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
when(catalogService.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
- ClusterNode localNode = mock(ClusterNode.class);
-
- when(localNode.name()).thenReturn("localNode");
- when(localNode.id()).thenReturn("localNode");
+ ClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
partitionReplicaListener = new PartitionReplicaListener(
TEST_MV_PARTITION_STORAGE,
@@ -305,6 +302,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
insertRows(List.of(new Pair<>(testBinaryRow, rowId)), TestTransactionIds.newTransactionId());
}
+ ClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
+
ReplicaRequest request;
switch (arg.type) {
@@ -318,6 +317,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.schemaVersion(testPk.schemaVersion())
.primaryKey(testPk.tupleSlice())
.requestType(arg.type)
+ .coordinatorId(localNode.id())
.build();
break;
@@ -336,6 +336,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.schemaVersion(testBinaryRow.schemaVersion())
.binaryTuple(testBinaryRow.tupleSlice())
.requestType(arg.type)
+ .coordinatorId(localNode.id())
.build();
break;
@@ -388,6 +389,8 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
}
}
+ ClusterNode localNode = DummyInternalTableImpl.LOCAL_NODE;
+
ReplicaRequest request;
switch (arg.type) {
@@ -400,6 +403,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.schemaVersion(pks.iterator().next().schemaVersion())
.primaryKeys(pks.stream().map(BinaryRow::tupleSlice).collect(toList()))
.requestType(arg.type)
+ .coordinatorId(localNode.id())
.build();
break;
@@ -415,6 +419,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.schemaVersion(rows.iterator().next().schemaVersion())
.binaryTuples(binaryRowsToBuffers(rows))
.requestType(arg.type)
+ .coordinatorId(localNode.id())
.build();
break;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index a94b718741..0da7ed80b1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -802,6 +802,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.indexToUse(sortedIndexId)
.batchSize(4)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(), localNode.id());
List<BinaryRow> rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -819,6 +820,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.indexToUse(sortedIndexId)
.batchSize(4)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -839,6 +841,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.flags(SortedIndexStorage.LESS_OR_EQUAL)
.batchSize(5)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -857,6 +860,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.lowerBoundPrefix(toIndexBound(5))
.batchSize(5)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -875,6 +879,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.exactKey(toIndexKey(0))
.batchSize(5)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(), localNode.id());
rows = (List<BinaryRow>) fut.get(1, TimeUnit.SECONDS).result();
@@ -1194,6 +1199,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryTuple(binaryRow.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
@@ -1213,6 +1219,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.primaryKey(binaryRow.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
@@ -1239,6 +1246,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryTuples(binaryRowsToBuffers(binaryRows))
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
@@ -1262,6 +1270,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.primaryKeys(binaryRowsToBuffers(binaryRows))
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
@@ -1286,6 +1295,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryTuple(binaryRow.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build();
},
() -> checkRowInMvStorage(binaryRow(0), true)
@@ -1314,6 +1324,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice()))
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build();
},
() -> checkRowInMvStorage(binaryRow(0), true)
@@ -1804,6 +1815,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.newBinaryTuple(newRow.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.full(full)
.build(),
localNode.id()
@@ -1823,6 +1835,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(),
localNode.id()
)
@@ -1841,6 +1854,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.scanId(1)
.batchSize(100)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(),
localNode.id()
)
@@ -1864,6 +1878,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.batchSize(100)
.full(false)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(),
localNode.id()
);
@@ -1875,6 +1890,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(targetTxId)
.scanId(1)
+ // TODO: both following lines will not be needed after IGNITE-21290
+ .coordinatorId(localNode.id())
+ .commitPartitionId(commitPartitionId())
.build(),
localNode.id()
);
@@ -2489,6 +2507,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryTuple(row.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(),
localNode.id()
).join().result();
@@ -2503,6 +2522,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.primaryKey(row.tupleSlice())
.enlistmentConsistencyToken(1L)
.commitPartitionId(commitPartitionId())
+ .coordinatorId(localNode.id())
.build(),
localNode.id()
).join().result();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
index dd8fcda2da..32c23ca163 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/PartitionCommandsMarshallerImplTest.java
@@ -99,7 +99,6 @@ class PartitionCommandsMarshallerImplTest {
.txId(UUID.randomUUID())
.tablePartitionIds(List.of())
.requiredCatalogVersion(requiredCatalogVersion)
- .txCoordinatorId(UUID.randomUUID().toString())
.build();
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index f4faee3397..a0ea3e5b24 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -91,6 +91,13 @@ public interface InternalTransaction extends Transaction {
*/
HybridTimestamp startTimestamp();
+ /**
+ * Get the transaction coordinator inconsistent ID.
+ *
+ * @return Transaction coordinator inconsistent ID.
+ */
+ String coordinatorId();
+
/**
* Finishes a read-only transaction with a specific execution timestamp.
*
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
index 7260ea9e46..699a4ff803 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
@@ -40,15 +40,22 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti
/** The transaction manager. */
protected final TxManager txManager;
+ /**
+ * Transaction coordinator inconsistent ID.
+ */
+ private final String coordinatorId;
+
/**
* The constructor.
*
* @param txManager The tx manager.
* @param id The id.
+ * @param coordinatorId Transaction coordinator inconsistent ID.
*/
- public IgniteAbstractTransactionImpl(TxManager txManager, UUID id) {
+ public IgniteAbstractTransactionImpl(TxManager txManager, UUID id, String coordinatorId) {
this.txManager = txManager;
this.id = id;
+ this.coordinatorId = coordinatorId;
}
/** {@inheritDoc} */
@@ -57,6 +64,16 @@ public abstract class IgniteAbstractTransactionImpl implements InternalTransacti
return id;
}
+ /**
+ * Get the transaction coordinator inconsistent ID.
+ *
+ * @return Transaction coordinator inconsistent ID.
+ */
+ @Override
+ public String coordinatorId() {
+ return coordinatorId;
+ }
+
/** {@inheritDoc} */
@Override
public @Nullable TxState state() {
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index ce675e5688..3d80baba8f 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -49,15 +49,17 @@ class ReadOnlyTransactionImpl extends IgniteAbstractTransactionImpl {
* @param txManager The tx manager.
* @param observableTsTracker Observable timestamp tracker.
* @param id The id.
+ * @param txCoordinatorId Transaction coordinator inconsistent ID.
* @param readTimestamp The read timestamp.
*/
ReadOnlyTransactionImpl(
TxManagerImpl txManager,
HybridTimestampTracker observableTsTracker,
UUID id,
+ String txCoordinatorId,
HybridTimestamp readTimestamp
) {
- super(txManager, id);
+ super(txManager, id, txCoordinatorId);
this.readTimestamp = readTimestamp;
this.observableTsTracker = observableTsTracker;
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 0da8b17b4f..ce00f55469 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -68,9 +68,15 @@ public class ReadWriteTransactionImpl extends IgniteAbstractTransactionImpl {
* @param txManager The tx manager.
* @param observableTsTracker Observable timestamp tracker.
* @param id The id.
+ * @param txCoordinatorId Transaction coordinator inconsistent ID.
*/
- public ReadWriteTransactionImpl(TxManager txManager, HybridTimestampTracker observableTsTracker, UUID id) {
- super(txManager, id);
+ public ReadWriteTransactionImpl(
+ TxManager txManager,
+ HybridTimestampTracker observableTsTracker,
+ UUID id,
+ String txCoordinatorId
+ ) {
+ super(txManager, id, txCoordinatorId);
this.observableTsTracker = observableTsTracker;
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f1113d858f..b5eb750e59 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -283,7 +283,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId, null, null));
if (!readOnly) {
- return new ReadWriteTransactionImpl(this, timestampTracker, txId);
+ return new ReadWriteTransactionImpl(this, timestampTracker, txId, localNodeId);
}
HybridTimestamp observableTimestamp = timestampTracker.get();
@@ -319,7 +319,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler {
);
}
- return new ReadOnlyTransactionImpl(this, timestampTracker, txId, readTimestamp);
+ return new ReadOnlyTransactionImpl(this, timestampTracker, txId, localNodeId, readTimestamp);
}
/**
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
index 628db89036..08c494b014 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java
@@ -128,11 +128,11 @@ public class TxMessageSender {
/**
* Send a transactions finish request.
*
- * @param primaryConsistentId Node id to send the request to.
+ * @param primaryConsistentId Node consistent id to send the request to.
* @param commitPartition Partition to store a transaction state.
* @param replicationGroupIds Enlisted partition groups.
* @param txId Transaction id.
- * @param term Raft term.
+ * @param consistencyToken Enlistment consistency token.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
* @return Completable future of {@link TransactionResult}.
@@ -142,7 +142,7 @@ public class TxMessageSender {
TablePartitionId commitPartition,
Collection<ReplicationGroupId> replicationGroupIds,
UUID txId,
- Long term,
+ Long consistencyToken,
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
@@ -155,7 +155,7 @@ public class TxMessageSender {
.groups(replicationGroupIds)
.commit(commit)
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(consistencyToken)
.build());
}
@@ -165,21 +165,21 @@ public class TxMessageSender {
* @param primaryConsistentId Node id to send the request to.
* @param txId Transaction id.
* @param commitGrpId Partition to store a transaction state.
- * @param term Raft term.
+ * @param consistencyToken Enlistment consistency token.
* @return Completable future of {@link TransactionMeta}.
*/
public CompletableFuture<TransactionMeta> resolveTxStateFromCommitPartition(
String primaryConsistentId,
UUID txId,
TablePartitionId commitGrpId,
- Long term
+ Long consistencyToken
) {
return replicaService.invoke(
primaryConsistentId,
FACTORY.txStateCommitPartitionRequest()
.groupId(commitGrpId)
.txId(txId)
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(consistencyToken)
.build());
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
index 380a34044b..59dc9fef57 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishReplicaRequest.java
@@ -38,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
* <li>Send cleanup requests to all enlisted primary replicas.</li>
* </ol>
*/
-@Transferable(value = TxMessageGroup.TX_FINISH_REQUEST)
+@Transferable(TxMessageGroup.TX_FINISH_REQUEST)
public interface TxFinishReplicaRequest extends PrimaryReplicaRequest, TimestampAware {
/**
* Returns transaction Id.
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 167e02e123..bf8e303ca6 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -41,7 +41,7 @@ class ReadOnlyTransactionImplTest extends BaseIgniteAbstractTest {
HybridTimestamp readTimestamp = new HybridClockImpl().now();
UUID txId = TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(readTimestamp);
- var tx = new ReadOnlyTransactionImpl(txManager, new HybridTimestampTracker(), txId, readTimestamp);
+ var tx = new ReadOnlyTransactionImpl(txManager, new HybridTimestampTracker(), txId, "localId", readTimestamp);
assertThat(tx.startTimestamp(), is(readTimestamp));
}
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index cb9fe9d9cf..253e429910 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -80,7 +80,7 @@ class ReadWriteTransactionImplTest extends BaseIgniteAbstractTest {
UUID txId = TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
- var tx = new ReadWriteTransactionImpl(txManager, new HybridTimestampTracker(), txId);
+ var tx = new ReadWriteTransactionImpl(txManager, new HybridTimestampTracker(), txId, CLUSTER_NODE.id());
assertThat(tx.startTimestamp(), is(beginTs));
}
@@ -111,7 +111,7 @@ class ReadWriteTransactionImplTest extends BaseIgniteAbstractTest {
UUID txId = TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
- var tx = new ReadWriteTransactionImpl(txManager, new HybridTimestampTracker(), txId);
+ var tx = new ReadWriteTransactionImpl(txManager, new HybridTimestampTracker(), txId, CLUSTER_NODE.id());
tx.assignCommitPartition(TX_COMMIT_PART);