You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/11/29 14:09:35 UTC
(ignite-3) branch main updated: IGNITE-20768 Test coverage of batch operations (#2893)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ee489051fd IGNITE-20768 Test coverage of batch operations (#2893)
ee489051fd is described below
commit ee489051fd1ee13f11c9246eed58d4e818177893
Author: Cyrill <cy...@gmail.com>
AuthorDate: Wed Nov 29 17:09:29 2023 +0300
IGNITE-20768 Test coverage of batch operations (#2893)
---
.../table/distributed/StorageCleanupTest.java | 427 +++++++++++++++++++++
.../replication/PartitionReplicaListenerTest.java | 30 ++
2 files changed, 457 insertions(+)
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 92dcd6451e..79ad4870ac 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -42,6 +44,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
@@ -51,6 +54,7 @@ import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.Sto
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replicator.TimedBinaryRow;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.junit.jupiter.api.BeforeEach;
@@ -205,6 +209,88 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(storage, never()).commitWrite(any(), any());
}
+ @Test
+ void testSimpleCommitBatch() {
+ UUID txUuid = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu"));
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ TimedBinaryRow tb1 = new TimedBinaryRow(row1, null);
+ TimedBinaryRow tb2 = new TimedBinaryRow(row2, null);
+ TimedBinaryRow tb3 = new TimedBinaryRow(row3, null);
+
+ UUID id1 = UUID.randomUUID();
+ UUID id2 = UUID.randomUUID();
+ UUID id3 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ id1, tb1,
+ id2, tb2,
+ id3, tb3
+ );
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate, partitionId, true, null, null);
+
+ assertEquals(3, storage.rowsCount());
+ // We have three writes to the storage.
+ verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(), anyInt());
+
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ assertEquals(3, storage.rowsCount());
+ // Those writes resulted in three commits.
+ verify(storage, times(3)).commitWrite(any(), any());
+
+ // Now reset the invocation counter.
+ clearInvocations(storage);
+
+ // And run cleanup again for the same transaction.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ assertEquals(3, storage.rowsCount());
+ // And no invocation after, meaning idempotence of the cleanup.
+ verify(storage, never()).commitWrite(any(), any());
+
+ ReadResult result1 = storage.read(new RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE);
+ assertEquals(row1, result1.binaryRow());
+
+ ReadResult result2 = storage.read(new RowId(partitionId.partitionId(), id2), HybridTimestamp.MAX_VALUE);
+ assertEquals(row2, result2.binaryRow());
+
+ ReadResult result3 = storage.read(new RowId(partitionId.partitionId(), id3), HybridTimestamp.MAX_VALUE);
+ assertEquals(row3, result3.binaryRow());
+
+ // Reset the invocation counter.
+ clearInvocations(storage);
+
+ // Now delete rows with a batch
+ Map<UUID, TimedBinaryRow> rowsToDelete = new HashMap<>();
+ rowsToDelete.put(id2, null);
+ rowsToDelete.put(id3, null);
+
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToDelete, partitionId, true, null, null);
+
+ // We have three writes to the storage.
+ verify(storage, times(2)).addWrite(any(), any(), any(), anyInt(), anyInt());
+
+ // And run cleanup again for the same transaction.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ ReadResult resultAfterDelete1 = storage.read(new RowId(partitionId.partitionId(), id1), HybridTimestamp.MAX_VALUE);
+ assertEquals(row1, resultAfterDelete1.binaryRow());
+
+ ReadResult resultAfterDelete2 = storage.read(new RowId(partitionId.partitionId(), id2), HybridTimestamp.MAX_VALUE);
+ assertNull(resultAfterDelete2.binaryRow());
+
+ ReadResult resultAfterDelete3 = storage.read(new RowId(partitionId.partitionId(), id3), HybridTimestamp.MAX_VALUE);
+ assertNull(resultAfterDelete3.binaryRow());
+ }
+
@Test
void testCleanupAndUpdateRow() {
UUID txUuid = UUID.randomUUID();
@@ -266,6 +352,80 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(storage, times(2)).commitWrite(any(), any());
}
+ @Test
+ void testCleanupAndUpdateRowBatch() {
+ UUID txUuid = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu"));
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+
+ UUID row1Id = UUID.randomUUID();
+ UUID row2Id = UUID.randomUUID();
+ UUID row3Id = UUID.randomUUID();
+
+ TimedBinaryRow tb1 = new TimedBinaryRow(row1, null);
+ TimedBinaryRow tb2 = new TimedBinaryRow(row2, null);
+ TimedBinaryRow tb3 = new TimedBinaryRow(row3, null);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ row1Id, tb1,
+ row2Id, tb2,
+ row3Id, tb3
+ );
+ // Do not track write intents to simulate the loss of a volatile state.
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate, partitionId, false, null, null);
+
+ assertEquals(3, storage.rowsCount());
+
+ // Now run cleanup.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // But the loss of the state results in no cleanup, and the entries are still write intents.
+ verify(storage, never()).commitWrite(any(), any());
+
+ // Now imagine we have another transaction that resolves the row, does the cleanup and commits its own data.
+
+ // Resolve one of the rows affected by the committed transaction.
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row1Id));
+
+ // Run the cleanup.
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // Only the discovered write intent was committed, the other two are still write intents.
+ verify(storage, times(1)).commitWrite(any(), any());
+
+ BinaryRow row4 = binaryRow(new TestKey(1, "foo1"), new TestValue(20, "bar20"));
+
+ UUID newTxUuid = UUID.randomUUID();
+
+ // Insert new write intent in the new transaction.
+ TimedBinaryRow tb4 = new TimedBinaryRow(row4, null);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ row1Id, tb4
+ );
+ storageUpdateHandler.handleUpdateAll(newTxUuid, rowsToUpdate2, partitionId, true, null, null);
+
+ // And concurrently the other two intents were also resolved and scheduled for cleanup.
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row2Id));
+ storageUpdateHandler.handleWriteIntentRead(txUuid, new RowId(PARTITION_ID, row3Id));
+
+ // Now reset the invocation counter.
+ clearInvocations(storage);
+
+ // Run cleanup for the original transaction
+ storageUpdateHandler.handleTransactionCleanup(txUuid, true, commitTs);
+
+ // Only those two entries will be affected.
+ verify(storage, times(2)).commitWrite(any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateNoData() {
UUID runningTx = UUID.randomUUID();
@@ -322,6 +482,50 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateNoWriteIntentBatch() {
+ UUID committedTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit a row
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committedTx, rowsToUpdate, partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committedTx, true, commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now create a new write intent over the committed data. No cleanup should be triggered.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+
+ UUID runningTx = UUID.randomUUID();
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate2, partitionId, true, null, null);
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateSameTxOnlyWriteIntent() {
UUID runningTx = UUID.randomUUID();
@@ -359,6 +563,51 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateSameTxOnlyWriteIntentBatch() {
+ UUID runningTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // Create a write intent.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Then create another one for the same row in the same transaction. The entry will be replaced.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+ // Do not track write intents to simulate the loss of a volatile state.
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate2, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateDifferentTxOnlyWriteIntent() {
UUID runningTx = UUID.randomUUID();
@@ -399,6 +648,54 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateDifferentTxOnlyWriteIntentBatch() {
+ UUID runningTx = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // Create a new write intent.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another one and pass `last commit time`. The previous value should be committed automatically.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx2 = UUID.randomUUID();
+
+ // Previous value will be committed even though the cleanup was not called explicitly.
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx2, rowsToUpdate2, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, times(1)).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateAbortWriteIntent() {
UUID committed1 = UUID.randomUUID();
@@ -451,6 +748,70 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateAbortWriteIntentBatch() {
+ UUID committed1 = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit an entry.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now add a new write intent.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+
+ UUID committed2 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed2, rowsToUpdate2, partitionId, true, null, null);
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another write intent and provide `last commit time`.
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx = UUID.randomUUID();
+
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu"));
+
+ // Last commit time equal to the time of the previously committed value => previous write intent will be reverted.
+ Map<UUID, TimedBinaryRow> rowsToUpdate3 = Map.of(
+ rowId, new TimedBinaryRow(row3, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, never()).commitWrite(any(), any());
+ verify(storage, times(1)).abortWrite(any());
+ verify(indexUpdateHandler, times(1)).tryRemoveFromIndexes(any(), any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateCommitWriteIntent() {
UUID committed1 = UUID.randomUUID();
@@ -505,6 +866,72 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
}
+ @Test
+ void testCleanupBeforeUpdateCommitWriteIntentBatch() {
+ UUID committed1 = UUID.randomUUID();
+
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ // First commit an entry.
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, "bar"));
+
+ UUID rowId = UUID.randomUUID();
+
+ HybridTimestamp commitTs = CLOCK.now();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate = Map.of(
+ rowId, new TimedBinaryRow(row1, null)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed1, rowsToUpdate, partitionId, true, null, null);
+
+ storageUpdateHandler.handleTransactionCleanup(committed1, true, commitTs);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertFalse(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Now add a new write intent.
+
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, "baz"));
+
+ UUID committed2 = UUID.randomUUID();
+
+ Map<UUID, TimedBinaryRow> rowsToUpdate2 = Map.of(
+ rowId, new TimedBinaryRow(row2, commitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(committed2, rowsToUpdate2, partitionId, true, null, null);
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // Create another write intent and provide `last commit time`.
+
+ clearInvocations(storage, indexUpdateHandler);
+
+ UUID runningTx = UUID.randomUUID();
+
+ BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, "zzu"));
+
+ HybridTimestamp lastCommitTs = commitTs.addPhysicalTime(100);
+
+ // Last commit time is after the time of the previously committed value => previous write intent will be committed.
+ Map<UUID, TimedBinaryRow> rowsToUpdate3 = Map.of(
+ rowId, new TimedBinaryRow(row3, lastCommitTs)
+ );
+
+ storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3, partitionId, true, null, null);
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId), HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ verify(storage, times(1)).commitWrite(any(), any());
+ verify(storage, never()).abortWrite(any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), any());
+ }
+
@Test
void testCleanupBeforeUpdateError() {
UUID committed1 = UUID.randomUUID();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 32ac621d8d..5071d0d5ad 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -128,6 +128,7 @@ import org.apache.ignite.internal.table.distributed.command.PartitionCommand;
import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.command.UpdateCommandImpl;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
@@ -1324,6 +1325,35 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
assertThat(writeAfterCleanupFuture, willThrowFast(TransactionException.class));
}
+ @Test
+ void testWriteIntentBearsLastCommitTimestamp() {
+ BinaryRow br1 = binaryRow(1);
+
+ BinaryRow br2 = binaryRow(2);
+
+ // First insert a row
+ UUID tx0 = newTxId();
+ upsert(tx0, br1);
+ upsert(tx0, br2);
+
+ cleanup(tx0);
+
+ raftClientFutureClosure = partitionCommand -> {
+ assertTrue(partitionCommand instanceof UpdateCommandImpl);
+
+ UpdateCommandImpl impl = (UpdateCommandImpl) partitionCommand;
+
+ assertNotNull(impl.messageRowToUpdate());
+ assertNotNull(impl.messageRowToUpdate().binaryRow());
+ assertNotNull(impl.messageRowToUpdate().timestamp());
+
+ return defaultMockRaftFutureClosure.apply(partitionCommand);
+ };
+
+ UUID tx1 = newTxId();
+ upsert(tx1, br1);
+ }
+
/**
* Puts several records into the storage, optionally leaving them as write intents, alternately deleting and upserting the same row
* within the same RW transaction, then checking read correctness via read only request.