You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/20 14:18:06 UTC
[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
new b689d02c2c IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)
b689d02c2c is described below
commit b689d02c2ce7cd7797dbdfadd1701b4a55df3d64
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Oct 19 12:39:51 2022 +0300
IGNITE-17637 Implement a commit partition path write intent resolution logic for RO reads (#1197)
(cherry picked from commit 7b0b3395de97db09896272e03322bba302c0b556)
---
.../distributed/ItTxDistributedTestSingleNode.java | 22 +-
.../internal/table/distributed/TableManager.java | 20 +-
.../replicator/PartitionReplicaListener.java | 269 ++++++++++-
.../distributed/replicator/PlacementDriver.java | 107 +++++
.../replication/PartitionReplicaListenerTest.java | 497 +++++++++++++++++++++
.../table/impl/DummyInternalTableImpl.java | 6 +-
.../ignite/internal/tx/message/TxMessageGroup.java | 5 +
...essageGroup.java => TxStateReplicaRequest.java} | 29 +-
8 files changed, 916 insertions(+), 39 deletions(-)
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index bda8d9fceb..f380f04e36 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.table.TxAbstractTest;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -77,6 +78,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -119,6 +121,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
protected Map<ClusterNode, TxManager> txManagers;
+ protected Map<ClusterNode, TopologyService> topologyServices;
+
protected Int2ObjectOpenHashMap<RaftGroupService> accRaftClients;
protected Int2ObjectOpenHashMap<RaftGroupService> custRaftClients;
@@ -224,6 +228,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
// Start raft servers. Each raft server can hold multiple groups.
clocks = new HashMap<>(nodes);
raftServers = new HashMap<>(nodes);
+ topologyServices = new HashMap<>(nodes);
replicaManagers = new HashMap<>(nodes);
replicaServices = new HashMap<>(nodes);
txManagers = new HashMap<>(nodes);
@@ -244,6 +249,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
raftServers.put(node, raftSrv);
+ topologyServices.put(node, cluster.get(i).topologyService());
+
ReplicaManager replicaMgr = new ReplicaManager(
cluster.get(i),
clock,
@@ -362,6 +369,14 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
for (ClusterNode node : partNodes) {
var testMpPartStorage = new TestMvPartitionStorage(0);
+ var txSateStorage = new TestConcurrentHashMapTxStateStorage();
+ var placementDriver = new PlacementDriver(replicaServices.get(node));
+
+ for (int part = 0; part < assignment.size(); part++) {
+ String replicaGrpId = name + "-part-" + part;
+
+ placementDriver.updateAssignment(replicaGrpId, assignment.get(part));
+ }
int partId = p;
@@ -373,7 +388,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
() -> {
return new PartitionListener(
testMpPartStorage,
- new TestConcurrentHashMapTxStateStorage(),
+ txSateStorage,
txManagers.get(node),
primaryIndex
);
@@ -393,7 +408,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
grpId,
tblId,
primaryIndex,
- clocks.get(node)
+ clocks.get(node),
+ txSateStorage,
+ topologyServices.get(node),
+ placementDriver
));
} catch (NodeStoppingException e) {
fail("Unexpected node stopping", e);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 35c33e3372..14a6cfe21a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -123,6 +123,7 @@ import org.apache.ignite.internal.table.distributed.raft.RebalanceRaftGroupEvent
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
@@ -220,6 +221,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Data storage manager. */
private final DataStorageManager dataStorageMgr;
+ /** Placement driver. */
+ private final PlacementDriver placementDriver;
+
/** Here a table future stores during creation (until the table can be provided to client). */
private final Map<UUID, CompletableFuture<Table>> tableCreateFuts = new ConcurrentHashMap<>();
@@ -333,6 +337,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
this.volatileLogStorageFactoryCreator = volatileLogStorageFactoryCreator;
this.clock = clock;
+ placementDriver = new PlacementDriver(replicaSvc);
+
netAddrResolver = addr -> {
ClusterNode node = topologyService.getByAddress(addr);
@@ -679,6 +685,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String grpId = partitionRaftGroupName(tblId, partId);
+ placementDriver.updateAssignment(grpId, nodes);
+
CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
@@ -786,7 +794,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
grpId,
tblId,
primaryIndex,
- clock
+ clock,
+ internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
+ topologyService,
+ placementDriver
)
);
} catch (NodeStoppingException ex) {
@@ -1699,6 +1710,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
: ByteUtils.fromBytes(stableAssignments);
+ placementDriver.updateAssignment(grpId, assignments);
+
ClusterNode localMember = raftMgr.topologyService().localMember();
var deltaPeers = newPeers.stream()
@@ -1762,7 +1775,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
grpId,
tblId,
primaryIndex,
- clock
+ clock,
+ tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
+ raftMgr.topologyService(),
+ placementDriver
)
);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 259df84a01..64c8a2079f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.hlc.HybridTimestamp;
@@ -66,13 +67,20 @@ import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.message.TxCleanupReplicaRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
@@ -80,6 +88,9 @@ import org.jetbrains.annotations.Nullable;
/** Partition replication listener. */
public class PartitionReplicaListener implements ReplicaListener {
+ /** Tx messages factory. */
+ private static final TxMessagesFactory FACTORY = new TxMessagesFactory();
+
/** Replication group id. */
private final String replicationGroupId;
@@ -117,6 +128,24 @@ public class PartitionReplicaListener implements ReplicaListener {
*/
private final ConcurrentNavigableMap<IgniteUuid, PartitionTimestampCursor> cursors;
+ /** Tx state storage. */
+ private final TxStateStorage txStateStorage;
+
+ /** Topology service. */
+ private final TopologyService topologyService;
+
+ /** Hybrid clock. */
+ private final HybridClock hybridClock;
+
+ /** Placement Driver. */
+ private final PlacementDriver placementDriver;
+
+ /**
+ * Map to control clock's update in the read only transactions concurrently with a commit timestamp.
+ * TODO: IGNITE-17261 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()).
+ */
+ ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> txTimestampUpdateMap = new ConcurrentHashMap<>();
+
/**
* The constructor.
*
@@ -124,9 +153,14 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param raftClient Raft client.
* @param txManager Transaction manager.
* @param lockManager Lock manager.
+ * @param partId Partition id.
+ * @param replicationGroupId replication group id.
* @param tableId Table id.
* @param primaryIndex Primary index.
* @param hybridClock Hybrid clock.
+ * @param txStateStorage Transaction state storage.
+ * @param topologyService Topology services.
+ * @param placementDriver Placement driver.
*/
public PartitionReplicaListener(
MvPartitionStorage mvDataStorage,
@@ -137,7 +171,10 @@ public class PartitionReplicaListener implements ReplicaListener {
String replicationGroupId,
UUID tableId,
ConcurrentHashMap<ByteBuffer, RowId> primaryIndex,
- HybridClock hybridClock
+ HybridClock hybridClock,
+ TxStateStorage txStateStorage,
+ TopologyService topologyService,
+ PlacementDriver placementDriver
) {
this.mvDataStorage = mvDataStorage;
this.raftClient = raftClient;
@@ -147,6 +184,10 @@ public class PartitionReplicaListener implements ReplicaListener {
this.replicationGroupId = replicationGroupId;
this.tableId = tableId;
this.primaryIndex = primaryIndex;
+ this.hybridClock = hybridClock;
+ this.txStateStorage = txStateStorage;
+ this.topologyService = topologyService;
+ this.placementDriver = placementDriver;
//TODO: IGNITE-17479 Integrate indexes into replicaListener command handlers
this.indexScanId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 1);
@@ -170,6 +211,10 @@ public class PartitionReplicaListener implements ReplicaListener {
/** {@inheritDoc} */
@Override
public CompletableFuture<Object> invoke(ReplicaRequest request) {
+ if (request instanceof TxStateReplicaRequest) {
+ return processTxStateReplicaRequest((TxStateReplicaRequest) request);
+ }
+
return ensureReplicaIsPrimary(request)
.thenCompose((ignore) -> {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
@@ -200,6 +245,60 @@ public class PartitionReplicaListener implements ReplicaListener {
});
}
+ /**
+ * Processes a transaction state request.
+ *
+ * @param request Transaction state request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object> processTxStateReplicaRequest(TxStateReplicaRequest request) {
+ return raftClient.refreshAndGetLeaderWithTerm()
+ .thenCompose(replicaAndTerm -> {
+ NetworkAddress leaderAddress = replicaAndTerm.get1().address();
+
+ if (topologyService.localMember().address().equals(leaderAddress)) {
+
+ CompletableFuture<TxMeta> txStateFut = getTxStateConcurrently(request);
+
+ return txStateFut.thenApply(txMeta -> new IgniteBiTuple<>(txMeta, null));
+ } else {
+ return CompletableFuture.completedFuture(
+ new IgniteBiTuple<>(null, topologyService.getByAddress(leaderAddress)));
+ }
+ }
+ );
+ }
+
+ /**
+ * Gets a transaction state or {@code null}, if the transaction is not completed.
+ *
+ * @param txStateReq Transaction state request.
+ * @return Future to transaction state meta or {@code null}.
+ */
+ private CompletableFuture<TxMeta> getTxStateConcurrently(TxStateReplicaRequest txStateReq) {
+ //TODO: IGNITE-17261 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()).
+ CompletableFuture<TxMeta> txStateFut = new CompletableFuture<>();
+
+ txTimestampUpdateMap.compute(txStateReq.txId(), (uuid, fut) -> {
+ if (fut != null) {
+ fut.thenAccept(txMeta -> txStateFut.complete(txMeta));
+ } else {
+ TxMeta txMeta = txStateStorage.get(txStateReq.txId());
+
+ if (txMeta == null) {
+ // All future transactions will be committed after the resolution processed.
+ hybridClock.update(txStateReq.commitTimestamp());
+ }
+
+ txStateFut.complete(txMeta);
+ }
+
+ return null;
+ });
+
+ return txStateFut;
+ }
+
/**
* Processes retrieve batch for read only transaction.
*
@@ -209,6 +308,7 @@ public class PartitionReplicaListener implements ReplicaListener {
private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) {
UUID txId = request.transactionId();
int batchCount = request.batchSize();
+ HybridTimestamp timestamp = request.timestamp();
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
@@ -218,7 +318,7 @@ public class PartitionReplicaListener implements ReplicaListener {
id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
while (batchRows.size() < batchCount && cursor.hasNext()) {
- BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), null);
+ BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), timestamp, () -> cursor.committed(timestamp));
if (resolvedReadResult != null) {
batchRows.add(resolvedReadResult);
@@ -239,7 +339,7 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
- if (request.requestType() != RequestType.RO_GET) {
+ if (request.requestType() != RequestType.RO_GET) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
}
@@ -247,7 +347,21 @@ public class PartitionReplicaListener implements ReplicaListener {
//TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
RowId rowId = rowIdByKey(indexId, searchKey);
- BinaryRow result = rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null;
+ ReadResult readResult = rowId == null ? null : mvDataStorage.read(rowId, request.timestamp());
+
+ BinaryRow result = readResult == null ? null : resolveReadResult(readResult, request.timestamp(), () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ", timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ });
return CompletableFuture.completedFuture(result);
}
@@ -264,7 +378,7 @@ public class PartitionReplicaListener implements ReplicaListener {
UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
- if (request.requestType() != RequestType.RO_GET_ALL) {
+ if (request.requestType() != RequestType.RO_GET_ALL) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
}
@@ -275,7 +389,21 @@ public class PartitionReplicaListener implements ReplicaListener {
//TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
RowId rowId = rowIdByKey(indexId, searchKey);
- result.add(rowId != null ? resolveReadResult(mvDataStorage.read(rowId, request.timestamp()), null) : null);
+ ReadResult readResult = rowId == null ? null : mvDataStorage.read(rowId, request.timestamp());
+
+ result.add(readResult == null ? null : resolveReadResult(readResult, request.timestamp(), () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ", timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ }));
}
return CompletableFuture.completedFuture(result);
@@ -390,14 +518,7 @@ public class PartitionReplicaListener implements ReplicaListener {
boolean commit = request.commit();
- CompletableFuture<Object> changeStateFuture = raftClient.run(
- new FinishTxCommand(
- txId,
- commit,
- request.commitTimestamp(),
- aggregatedGroupIds
- )
- );
+ CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
// TODO: https://issues.apache.org/jira/browse/IGNITE-17578 Cleanup process should be asynchronous.
CompletableFuture[] cleanupFutures = new CompletableFuture[request.groups().size()];
@@ -419,6 +540,38 @@ public class PartitionReplicaListener implements ReplicaListener {
return allOf(cleanupFutures).thenApply(ignored -> null);
}
+ /**
+ * Finishes a transaction.
+ *
+ * @param aggregatedGroupIds Replication groups identifies which are enlisted in the transaction.
+ * @param txId Transaction id.
+ * @param commit True is the transaction is committed, false otherwise.
+ * @return Future to wait of the finish.
+ */
+ private CompletableFuture<Object> finishTransaction(List<String> aggregatedGroupIds, UUID txId, boolean commit) {
+ // TODO: IGNITE-17261 Timestamp from request is not using until the issue has not been fixed (request.commitTimestamp())
+ var fut = new CompletableFuture<TxMeta>();
+
+ txTimestampUpdateMap.put(txId, fut);
+
+ HybridTimestamp commitTimestamp = hybridClock.now();
+
+ CompletableFuture<Object> changeStateFuture = raftClient.run(
+ new FinishTxCommand(
+ txId,
+ commit,
+ commitTimestamp,
+ aggregatedGroupIds
+ )
+ ).whenComplete((o, throwable) -> {
+ fut.complete(new TxMeta(commit ? TxState.COMMITED : TxState.ABORTED, aggregatedGroupIds, commitTimestamp));
+
+ txTimestampUpdateMap.remove(txId);
+ });
+
+ return changeStateFuture;
+ }
+
/**
* Processes transaction cleanup request:
@@ -1155,6 +1308,29 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
+ /**
+ * Resolves a read result for RW transaction.
+ *
+ * @param readResult Read result to resolve.
+ * @param txId Transaction id.
+ * @return Resolved binary row.
+ */
+ private BinaryRow resolveReadResult(ReadResult readResult, UUID txId) {
+ return resolveReadResult(readResult, txId, null, null);
+ }
+
+ /**
+ * Resolves a read result for RO transaction.
+ *
+ * @param readResult Read result to resolve.
+ * @param timestamp Timestamp.
+ * @param lastCommitted Action to get the latest committed row.
+ * @return Resolved binary row.
+ */
+ private BinaryRow resolveReadResult(ReadResult readResult, HybridTimestamp timestamp, Supplier<BinaryRow> lastCommitted) {
+ return resolveReadResult(readResult, null, timestamp, lastCommitted);
+ }
+
/**
* Resolves read result to the corresponding binary row. Following rules are used for read result resolution:
* <ol>
@@ -1167,9 +1343,16 @@ public class PartitionReplicaListener implements ReplicaListener {
*
* @param readResult Read result to resolve.
* @param txId Nullable transaction id, should be provided if resolution is performed within the context of RW transaction.
+ * @param timestamp Timestamp is used in RO transaction only.
+ * @param lastCommitted Action to get the latest committed row, it is used in RO transaction only.
* @return Resolved binary row.
*/
- private BinaryRow resolveReadResult(ReadResult readResult, @Nullable UUID txId) {
+ private BinaryRow resolveReadResult(
+ ReadResult readResult,
+ @Nullable UUID txId,
+ @Nullable HybridTimestamp timestamp,
+ @Nullable Supplier<BinaryRow> lastCommitted
+ ) {
if (readResult == null) {
return null;
} else {
@@ -1186,10 +1369,62 @@ public class PartitionReplicaListener implements ReplicaListener {
+ " actualTxId={" + retrievedResultTxId + '}');
}
} else {
+ if (!readResult.isWriteIntent()) {
+ return readResult.binaryRow();
+ }
+
+ CompletableFuture<BinaryRow> writeIntentResolutionFut = resolveWriteIntentAsync(
+ readResult, timestamp, lastCommitted);
+
// RO request.
- // TODO: IGNITE-17637 Implement a commit partition path write intent resolution logic
- return readResult.binaryRow();
+ return writeIntentResolutionFut.join();
}
}
}
+
+ /**
+ * Resolves a read result to the matched row.
+ * If the result does not match any row, the method returns a future to {@code null}.
+ *
+ * @param readResult Read result.
+ * @param timestamp Timestamp.
+ * @param lastCommitted Action to get a last committed row.
+ * @return Result future.
+ */
+ private CompletableFuture<BinaryRow> resolveWriteIntentAsync(
+ ReadResult readResult,
+ HybridTimestamp timestamp,
+ Supplier<BinaryRow> lastCommitted
+ ) {
+ String commitGrpId = partitionRaftGroupName(readResult.commitTableId(), readResult.commitPartitionId());
+
+ return placementDriver.sendMetaRequest(commitGrpId, FACTORY.txStateReplicaRequest()
+ .groupId(commitGrpId)
+ .commitTimestamp(timestamp)
+ .txId(readResult.transactionId())
+ .build())
+ .thenApply(txMeta -> {
+ if (txMeta == null) {
+ return lastCommitted.get();
+ } else if (txMeta.txState() == TxState.COMMITED && txMeta.commitTimestamp().compareTo(timestamp) <= 0) {
+ return readResult.binaryRow();
+ } else {
+ assert txMeta.txState() == TxState.ABORTED : "Unexpected transaction state [state=" + txMeta.txState() + ']';
+
+ return lastCommitted.get();
+ }
+ });
+ }
+
+ /**
+ * Compounds a RAFT group unique name.
+ *
+ * @param tblId Table identifier.
+ * @param partition Number of table partitions.
+ * @return A RAFT group name.
+ */
+ @NotNull
+ private String partitionRaftGroupName(UUID tblId, int partition) {
+ return tblId + "_part_" + partition;
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
new file mode 100644
index 0000000000..fc2e920ce8
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Placement driver.
+ */
+public class PlacementDriver {
+ /** Assignment nodes per replication group. */
+ private final Map<String, LinkedHashSet<ClusterNode>> primaryReplicaMapping = new ConcurrentHashMap<>();
+
+ /** Replication service. */
+ private final ReplicaService replicaService;
+
+ /**
+ * The constructor.
+ *
+ * @param replicaService Replication service.
+ */
+ public PlacementDriver(ReplicaService replicaService) {
+ this.replicaService = replicaService;
+ }
+
+ /**
+ * Sends a transaction sate request to the primary replica.
+ *
+ * @param replicaGrp Replication group id.
+ * @param request Status request.
+ * @return Result future.
+ */
+ public CompletableFuture<TxMeta> sendMetaRequest(String replicaGrp, TxStateReplicaRequest request) {
+ CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+
+ sendAndRetry(resFut, replicaGrp, request);
+
+ return resFut;
+ }
+
+ /**
+ * Updates an assignment for the specific replication group.
+ *
+ * @param replicaGrpId Replication group id.
+ * @param assignment Assignment.
+ */
+ public void updateAssignment(String replicaGrpId, Collection<ClusterNode> assignment) {
+ primaryReplicaMapping.put(replicaGrpId, new LinkedHashSet<>(assignment));
+ }
+
+ /**
+ * Tries to send a request to primary replica of the replication group.
+ * If the first node turns up not a primary one the logic sends the same request to a new primary node.
+ *
+ * @param resFut Response future.
+ * @param replicaGrp Replication group id.
+ * @param request Request.
+ */
+ private void sendAndRetry(CompletableFuture<TxMeta> resFut, String replicaGrp, TxStateReplicaRequest request) {
+ ClusterNode nodeToSend = primaryReplicaMapping.get(replicaGrp).iterator().next();
+
+ replicaService.invoke(nodeToSend, request).thenAccept(resp -> {
+ assert resp instanceof IgniteBiTuple : "Unsupported response type [type=" + resp.getClass().getSimpleName() + ']';
+
+ IgniteBiTuple<TxMeta, ClusterNode> stateAndLeader = (IgniteBiTuple) resp;
+
+ ClusterNode nextNodeToSend = stateAndLeader.get2();
+
+ if (nextNodeToSend == null) {
+ resFut.complete(stateAndLeader.get1());
+ } else {
+ LinkedHashSet<ClusterNode> newAssignment = new LinkedHashSet<>();
+
+ newAssignment.add(nextNodeToSend);
+ newAssignment.addAll(primaryReplicaMapping.get(replicaGrp));
+
+ primaryReplicaMapping.put(replicaGrp, newAssignment);
+
+ sendAndRetry(resFut, replicaGrp, request);
+ }
+ });
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
new file mode 100644
index 0000000000..f25659b24a
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.hlc.HybridClock;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/** There are tests for partition replica listener. */
+public class PartitionReplicaListenerTest extends IgniteAbstractTest {
+ /** Tx messages factory. */
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
+ /** Table messages factory. */
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
+
+ /** Partition id. */
+ private static final int partId = 0;
+
+ /** Table id. */
+ private static final UUID tblId = UUID.randomUUID();
+
+ /** Replication group id. */
+ private static final String grpId = tblId + "_part_" + partId;
+
+ /** Primary index map. */
+ private static final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+
+ /** Hybrid clock. */
+ private static final HybridClock clock = new HybridClock();
+
+ /** The storage stores transaction states. */
+ private static final TestConcurrentHashMapTxStateStorage txStateStorage = new TestConcurrentHashMapTxStateStorage();
+
+ /** The storage stores partition data. */
+ private static final TestMvPartitionStorage testMvPartitionStorage = new TestMvPartitionStorage(partId);
+
+ /** Local cluster node. */
+ private static final ClusterNode localNode = new ClusterNode("node1", "node1", NetworkAddress.from("127.0.0.1:127"));
+
+ /** Another (not local) cluster node. */
+ private static final ClusterNode anotherNode = new ClusterNode("node2", "node2", NetworkAddress.from("127.0.0.2:127"));
+
+ private static PlacementDriver placementDriver = mock(PlacementDriver.class);
+
+ @Mock
+ private static RaftGroupService mockRaftClient = mock(RaftGroupService.class);
+
+ @Mock
+ private static TopologyService topologySrv = mock(TopologyService.class);
+
+ /** Default reflection marshaller factory. */
+ protected static MarshallerFactory marshallerFactory;
+
+ /** Schema descriptor for tests. */
+ protected static SchemaDescriptor schemaDescriptor;
+
+ /** Key-value marshaller for tests. */
+ protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+
+ /** Partition replication listener to test. */
+ private static PartitionReplicaListener partitionReplicaListener;
+
+ /** If true the local replica is considered leader, false otherwise. */
+ private static boolean localLeader;
+
+ /** The state is used to resolve write intent. */
+ private static TxState txState;
+
+ @BeforeAll
+ private static void beforeAll() {
+ when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock -> {
+ if (!localLeader) {
+ return CompletableFuture.completedFuture(new IgniteBiTuple<>(new Peer(anotherNode.address()), 1L));
+ }
+
+ return CompletableFuture.completedFuture(new IgniteBiTuple<>(new Peer(localNode.address()), 1L));
+ });
+
+ when(topologySrv.getByAddress(any())).thenAnswer(invocationOnMock -> {
+ NetworkAddress addr = invocationOnMock.getArgument(0);
+ if (addr.equals(anotherNode.address())) {
+ return anotherNode;
+ } else if (addr.equals(localNode.address())) {
+ return localNode;
+ } else {
+ return null;
+ }
+ });
+
+ when(topologySrv.localMember()).thenReturn(localNode);
+
+ HybridTimestamp txFixedTimestamp = clock.now();
+
+ when(placementDriver.sendMetaRequest(eq(grpId), any())).thenAnswer(invocationOnMock -> {
+ TxMeta txMeta;
+
+ if (txState == null) {
+ txMeta = null;
+ } else if (txState == TxState.COMMITED) {
+ txMeta = new TxMeta(TxState.COMMITED, Collections.singletonList(grpId), txFixedTimestamp);
+ } else {
+ assert txState == TxState.ABORTED : "Sate is " + txState;
+
+ txMeta = new TxMeta(TxState.ABORTED, Collections.singletonList(grpId), txFixedTimestamp);
+ }
+ return CompletableFuture.completedFuture(txMeta);
+ });
+
+ partitionReplicaListener = new PartitionReplicaListener(
+ testMvPartitionStorage,
+ mockRaftClient,
+ mock(TxManager.class),
+ new HeapLockManager(),
+ partId,
+ grpId,
+ tblId,
+ primaryIndex,
+ clock,
+ txStateStorage,
+ topologySrv,
+ placementDriver
+ );
+
+ marshallerFactory = new ReflectionMarshallerFactory();
+
+ schemaDescriptor = new SchemaDescriptor(1, new Column[]{
+ new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+ new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
+ }, new Column[]{
+ new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+ new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false),
+ });
+
+ kvMarshaller = marshallerFactory.create(schemaDescriptor, TestKey.class, TestValue.class);
+ }
+
+ @BeforeEach
+ private void beforeTest() {
+ localLeader = true;
+ txState = null;
+ primaryIndex.clear();
+ }
+
+ @Test
+ public void testTxStateReplicaRequestEmptyState() {
+ CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ .groupId(grpId)
+ .commitTimestamp(clock.now())
+ .txId(Timestamp.nextVersion().toUuid())
+ .build());
+
+ IgniteBiTuple<Peer, Long> tuple = (IgniteBiTuple<Peer, Long>) fut.join();
+
+ assertNull(tuple.get1());
+ assertNull(tuple.get2());
+ }
+
+ @Test
+ public void testTxStateReplicaRequestCommitState() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+
+ txStateStorage.put(txId, new TxMeta(TxState.COMMITED, Collections.singletonList(grpId), clock.now()));
+
+ HybridTimestamp readTimestamp = clock.now();
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ .groupId(grpId)
+ .commitTimestamp(readTimestamp)
+ .txId(txId)
+ .build());
+
+ IgniteBiTuple<TxMeta, ClusterNode> tuple = (IgniteBiTuple<TxMeta, ClusterNode>) fut.join();
+
+ assertEquals(TxState.COMMITED, tuple.get1().txState());
+ assertTrue(readTimestamp.compareTo(tuple.get1().commitTimestamp()) > 0);
+ assertNull(tuple.get2());
+ }
+
+ @Test
+ public void testTxStateReplicaRequestMissLeaderMiss() {
+ localLeader = false;
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
+ .groupId(grpId)
+ .commitTimestamp(clock.now())
+ .txId(Timestamp.nextVersion().toUuid())
+ .build());
+
+ IgniteBiTuple<Peer, Long> tuple = (IgniteBiTuple<Peer, Long>) fut.join();
+
+ assertNull(tuple.get1());
+ assertNotNull(tuple.get2());
+ }
+
+ @Test
+ public void testReadOnlySingleRowReplicaRequestEmptyResult() {
+ BinaryRow testBinaryKey = nextBinaryKey();
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .groupId(grpId)
+ .timestamp(clock.now())
+ .transactionId(Timestamp.nextVersion().toUuid())
+ .binaryRow(testBinaryKey)
+ .requestType(RequestType.RO_GET)
+ .build());
+
+ BinaryRow binaryRow = (BinaryRow) fut.join();
+
+ assertNull(binaryRow);
+ }
+
+ @Test
+ public void testReadOnlySingleRowReplicaRequestCommittedResult() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ BinaryRow testBinaryKey = nextBinaryKey();
+ BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+ var rowId = new RowId(partId);
+
+ primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+ testMvPartitionStorage.commitWrite(rowId, clock.now());
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .groupId(grpId)
+ .timestamp(clock.now())
+ .transactionId(Timestamp.nextVersion().toUuid())
+ .binaryRow(testBinaryKey)
+ .requestType(RequestType.RO_GET)
+ .build());
+
+ BinaryRow binaryRow = (BinaryRow) fut.join();
+
+ assertNotNull(binaryRow);
+ }
+
+ @Test
+ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ BinaryRow testBinaryKey = nextBinaryKey();
+ BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+ var rowId = new RowId(partId);
+ txState = TxState.COMMITED;
+
+ primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .groupId(grpId)
+ .timestamp(clock.now())
+ .transactionId(Timestamp.nextVersion().toUuid())
+ .binaryRow(testBinaryKey)
+ .requestType(RequestType.RO_GET)
+ .build());
+
+ BinaryRow binaryRow = (BinaryRow) fut.join();
+
+ assertNotNull(binaryRow);
+ }
+
+ @Test
+ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ BinaryRow testBinaryKey = nextBinaryKey();
+ BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+ var rowId = new RowId(partId);
+
+ primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .groupId(grpId)
+ .timestamp(clock.now())
+ .transactionId(Timestamp.nextVersion().toUuid())
+ .binaryRow(testBinaryKey)
+ .requestType(RequestType.RO_GET)
+ .build());
+
+ BinaryRow binaryRow = (BinaryRow) fut.join();
+
+ assertNull(binaryRow);
+ }
+
+ @Test
+ public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() {
+ UUID txId = Timestamp.nextVersion().toUuid();
+ BinaryRow testBinaryKey = nextBinaryKey();
+ BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
+ var rowId = new RowId(partId);
+ txState = TxState.ABORTED;
+
+ primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
+
+ CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
+ .groupId(grpId)
+ .timestamp(clock.now())
+ .transactionId(Timestamp.nextVersion().toUuid())
+ .binaryRow(testBinaryKey)
+ .requestType(RequestType.RO_GET)
+ .build());
+
+ BinaryRow binaryRow = (BinaryRow) fut.join();
+
+ assertNull(binaryRow);
+ }
+
+ protected static BinaryRow nextBinaryKey() {
+ try {
+ int nextInt = (int) System.nanoTime();
+
+ return kvMarshaller.marshal(new TestKey(nextInt, "key " + nextInt));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static BinaryRow binaryRow(TestKey key, TestValue value) {
+ try {
+ return kvMarshaller.marshal(key, value);
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static TestKey key(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalKey(new Row(schemaDescriptor, binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static TestValue value(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalValue(new Row(schemaDescriptor, binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+
+ /**
+ * Test pojo key.
+ */
+ protected static class TestKey {
+ @IgniteToStringInclude
+ public int intKey;
+
+ @IgniteToStringInclude
+ public String strKey;
+
+ public TestKey() {
+ }
+
+ public TestKey(int intKey, String strKey) {
+ this.intKey = intKey;
+ this.strKey = strKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestKey testKey = (TestKey) o;
+ return intKey == testKey.intKey && Objects.equals(strKey, testKey.strKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(intKey, strKey);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(TestKey.class, this);
+ }
+ }
+
+ /**
+ * Test pojo value.
+ */
+ protected static class TestValue implements Comparable<TestValue> {
+ @IgniteToStringInclude
+ public Integer intVal;
+
+ @IgniteToStringInclude
+ public String strVal;
+
+ public TestValue() {
+ }
+
+ public TestValue(Integer intVal, String strVal) {
+ this.intVal = intVal;
+ this.strVal = strVal;
+ }
+
+ @Override
+ public int compareTo(TestValue o) {
+ int cmp = Integer.compare(intVal, o.intVal);
+
+ return cmp != 0 ? cmp : strVal.compareTo(o.strVal);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestValue testValue = (TestValue) o;
+ return Objects.equals(intVal, testValue.intVal) && Objects.equals(strVal, testValue.strVal);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(intVal, strVal);
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index c33bd9c3f2..9869656427 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -209,7 +209,11 @@ public class DummyInternalTableImpl extends InternalTableImpl {
groupId,
tableId(),
primaryIndex,
- new HybridClock()
+ new HybridClock(),
+ null,
+ null,
+ null
+
);
partitionListener = new PartitionListener(
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index cb920878a6..c9ab6bedec 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -38,4 +38,9 @@ public class TxMessageGroup {
* Message type for {@link TxCleanupReplicaRequest}.
*/
public static final short TX_CLEANUP_REQUEST = 2;
+
+ /**
+ * Message type for {@link TxStateReplicaRequest}.
+ */
+ public static final short TX_STATE_REQUEST = 3;
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
similarity index 60%
copy from modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
copy to modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
index cb920878a6..43a8466cb3 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java
@@ -17,25 +17,20 @@
package org.apache.ignite.internal.tx.message;
-import org.apache.ignite.network.annotations.MessageGroup;
+import java.util.UUID;
+import org.apache.ignite.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Message types for transactions.
+ * Transaction state request.
*/
-@MessageGroup(groupType = 5, groupName = "TxMessages")
-public class TxMessageGroup {
- /**
- * Message type for {@link TxFinishReplicaRequest}.
- */
- public static final short TX_FINISH_REQUEST = 0;
+@Transferable(TxMessageGroup.TX_STATE_REQUEST)
+public interface TxStateReplicaRequest extends ReplicaRequest {
+ @Marshallable
+ UUID txId();
- /**
- * Message type for {@link TxFinishResponse}.
- */
- public static final short TX_FINISH_RESPONSE = 1;
-
- /**
- * Message type for {@link TxCleanupReplicaRequest}.
- */
- public static final short TX_CLEANUP_REQUEST = 2;
+ @Marshallable
+ HybridTimestamp commitTimestamp();
}