You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2023/02/08 13:54:49 UTC

[ignite-3] branch ignite-18738 created (now a33a15a8c2)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-18738
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


      at a33a15a8c2 IGNITE-18738 Cleanup indexes on transaction abort

This branch includes the following new commits:

     new a33a15a8c2 IGNITE-18738 Cleanup indexes on transaction abort

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[ignite-3] 01/01: IGNITE-18738 Cleanup indexes on transaction abort

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18738
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a33a15a8c2a7b58ccce3d44bb3d85dccc9059631
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Feb 8 17:54:39 2023 +0400

    IGNITE-18738 Cleanup indexes on transaction abort
---
 .../storage/index/impl/TestHashIndexStorage.java   |   5 +
 .../storage/index/impl/TestSortedIndexStorage.java |   6 +
 .../table/distributed/StorageUpdateHandler.java    |  57 +++-
 .../distributed/TableSchemaAwareIndexStorage.java  |   4 +-
 .../distributed/raft/PartitionDataStorage.java     |  13 +
 .../table/distributed/raft/PartitionListener.java  |  27 +-
 .../SnapshotAwarePartitionDataStorage.java         |   9 +
 .../table/distributed/IndexCleanupTest.java        | 370 +++++++++++++++++++++
 .../distributed/TestPartitionDataStorage.java      |   7 +
 9 files changed, 482 insertions(+), 16 deletions(-)

diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 9dbdf16928..4d768ee577 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
@@ -197,4 +198,8 @@ public class TestHashIndexStorage implements HashIndexStorage {
 
         rebalance = false;
     }
+
+    public Set<RowId> allRows() {
+        return index.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    }
 }
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 9d15eabdc2..b2cd8507d1 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -25,8 +25,10 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -371,4 +373,8 @@ public class TestSortedIndexStorage implements SortedIndexStorage {
 
         rebalance = false;
     }
+
+    public Set<RowId> allRows() {
+        return index.values().stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet());
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index dae55ead81..7b55b47ca3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -24,13 +24,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -71,7 +74,7 @@ public class StorageUpdateHandler {
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
-            ByteBuffer rowBuffer,
+            @Nullable ByteBuffer rowBuffer,
             @Nullable Consumer<RowId> onReplication
     ) {
         storage.runConsistently(() -> {
@@ -80,7 +83,12 @@ public class StorageUpdateHandler {
             UUID commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
 
-            storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+            TableRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+
+            if (oldRow != null) {
+                // Previous uncommitted row should be removed from indexes.
+                removeFromIndex(oldRow, rowId);
+            }
 
             if (onReplication != null) {
                 onReplication.accept(rowId);
@@ -117,7 +125,12 @@ public class StorageUpdateHandler {
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     TableRow row = entry.getValue() != null ? new TableRow(entry.getValue()) : null;
 
-                    storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+                    TableRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+
+                    if (oldRow != null) {
+                        // Previous uncommitted row should be removed from indexes.
+                        removeFromIndex(oldRow, rowId);
+                    }
 
                     rowIds.add(rowId);
                     addToIndexes(row, rowId);
@@ -132,6 +145,44 @@ public class StorageUpdateHandler {
         });
     }
 
+    /**
+     * Handles the abortion of a transaction.
+     *
+     * @param pendingRowIds Row ids of write-intents to be rolled back.
+     * @param onReplication On replication callback.
+     */
+    public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onReplication) {
+        storage.runConsistently(() -> {
+            for (RowId rowId : pendingRowIds) {
+                try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                    if (cursor.hasNext()) {
+                        ReadResult item = cursor.next();
+                        if (item.isWriteIntent()) {
+                            TableRow row = item.tableRow();
+
+                            // No point in cleaning up indexes for tombstone, they should not exist.
+                            if (row != null) {
+                                removeFromIndex(row, item.rowId());
+                            }
+                        }
+                    }
+                }
+            }
+
+            pendingRowIds.forEach(storage::abortWrite);
+
+            onReplication.run();
+
+            return null;
+        });
+    }
+
+    private void removeFromIndex(TableRow row, RowId rowId) {
+        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+            index.remove(row, rowId);
+        }
+    }
+
     private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
         if (tableRow == null) { // skip removes
             return;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index be0578d8f8..dc661a5f00 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -81,8 +81,8 @@ public class TableSchemaAwareIndexStorage {
      * @param tableRow A table row to remove.
      * @param rowId An identifier of a row in a main storage.
      */
-    public void remove(BinaryRow tableRow, RowId rowId) {
-        BinaryTuple tuple = indexBinaryRowResolver.apply(tableRow);
+    public void remove(TableRow tableRow, RowId rowId) {
+        BinaryTuple tuple = indexTableRowResolver.apply(tableRow);
 
         storage.remove(new IndexRowImpl(tuple, rowId));
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index be097690b9..b008493968 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,9 +25,11 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -162,6 +164,17 @@ public interface PartitionDataStorage extends ManuallyCloseable {
      */
     void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException;
 
+    /**
+     * Scans all versions of a single row.
+     *
+     * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this method for intents having preceding committed
+     * versions.
+     *
+     * @param rowId Row id.
+     * @return Cursor of results including both rows data and transaction-related context. The versions are ordered from newest to oldest.
+     */
+    Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
+
     /**
      * Returns the underlying {@link MvPartitionStorage}. Only for tests!
      *
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9514245eb7..8b8a33fd43 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -313,23 +313,28 @@ public class PartitionListener implements RaftGroupListener {
             return;
         }
 
-        storage.runConsistently(() -> {
-            UUID txId = cmd.txId();
+        UUID txId = cmd.txId();
 
-            Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet());
+        Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet());
 
-            if (cmd.commit()) {
+        if (cmd.commit()) {
+            storage.runConsistently(() -> {
                 pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, cmd.commitTimestamp().asHybridTimestamp()));
-            } else {
-                pendingRowIds.forEach(storage::abortWrite);
-            }
 
-            txsPendingRowIds.remove(txId);
+                txsPendingRowIds.remove(txId);
 
-            storage.lastApplied(commandIndex, commandTerm);
+                storage.lastApplied(commandIndex, commandTerm);
 
-            return null;
-        });
+                return null;
+            });
+        } else {
+            storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () -> {
+                // on replication callback
+                txsPendingRowIds.remove(txId);
+
+                storage.lastApplied(commandIndex, commandTerm);
+            });
+        }
     }
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 2376e93b2b..7af2db5260 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,11 +24,13 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -129,6 +131,13 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
+        handleSnapshotInterference(rowId);
+
+        return partitionStorage.scanVersions(rowId);
+    }
+
     private void handleSnapshotInterference(RowId rowId) {
         PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
new file mode 100644
index 0000000000..f7447f923e
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests indexes cleaning up on data removal and transaction abortions. */
+public class IndexCleanupTest {
+    /** Default reflection marshaller factory. */
+    private static final MarshallerFactory MARSHALLER_FACTORY = new ReflectionMarshallerFactory();
+
+    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new SchemaDescriptor(1, new Column[]{
+            new Column("INTKEY", NativeTypes.INT32, false),
+            new Column("STRKEY", NativeTypes.STRING, false),
+    }, new Column[]{
+            new Column("INTVAL", NativeTypes.INT32, false),
+            new Column("STRVAL", NativeTypes.STRING, false),
+    });
+
+    private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryConverter PK_INDEX_BINARY_ROW_CONVERTER = BinaryConverter.forKey(SCHEMA_DESCRIPTOR);
+    private static final TableRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = new TableRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+    private static final int[] USER_INDEX_COLS = {
+            SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
+            SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
+    };
+
+    private static final BinaryTupleSchema USER_INDEX_SCHEMA = BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
+
+    private static final BinaryConverter USER_INDEX_BINARY_ROW_CONVERTER = BinaryConverter.forValue(SCHEMA_DESCRIPTOR);
+    private static final TableRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER = new TableRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+    /** Key-value {@link BinaryTuple} converter for tests. */
+    private static final BinaryConverter KV_BINARY_CONVERTER = BinaryConverter.forRow(SCHEMA_DESCRIPTOR);
+
+    /** Key-value marshaller for tests. */
+    private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
+            = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, TestValue.class);
+
+    private static final UUID TX_ID = UUID.randomUUID();
+
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private TestHashIndexStorage pkInnerStorage;
+    private TestSortedIndexStorage sortedInnerStorage;
+    private TestHashIndexStorage hashInnerStorage;
+    private TestMvPartitionStorage storage;
+    private TestPartitionDataStorage dataStorage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        UUID pkIndexId = UUID.randomUUID();
+        UUID sortedIndexId = UUID.randomUUID();
+        UUID hashIndexId = UUID.randomUUID();
+
+        pkInnerStorage = new TestHashIndexStorage(null);
+
+        TableSchemaAwareIndexStorage pkStorage = new TableSchemaAwareIndexStorage(
+                pkIndexId,
+                pkInnerStorage,
+                PK_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        sortedInnerStorage = new TestSortedIndexStorage(new SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false, true),
+                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false, true)
+        )));
+
+        TableSchemaAwareIndexStorage sortedIndexStorage = new TableSchemaAwareIndexStorage(
+                sortedIndexId,
+                sortedInnerStorage,
+                USER_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        hashInnerStorage = new TestHashIndexStorage(new HashIndexDescriptor(hashIndexId, List.of(
+                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false),
+                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false)
+        )));
+
+        TableSchemaAwareIndexStorage hashIndexStorage = new TableSchemaAwareIndexStorage(
+                hashIndexId,
+                hashInnerStorage,
+                USER_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        storage = new TestMvPartitionStorage(1);
+
+        dataStorage = new TestPartitionDataStorage(storage);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, dataStorage,
+                () -> Map.of(
+                        pkIndexId, pkStorage,
+                        sortedIndexId, sortedIndexStorage,
+                        hashIndexId, hashIndexStorage
+                )
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbort(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRows(), contains(rowId));
+        assertThat(sortedInnerStorage.allRows(), contains(rowId));
+        assertThat(hashInnerStorage.allRows(), contains(rowId));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testTombstoneCleansUpIndexes(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        // Write intent is in the storage.
+        assertEquals(1, storage.rowsCount());
+
+        // But indexes are removed.
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortConsecutiveTxWithMatchingIndexes(AddWrite writer) {
+        UUID rowUuid1 = UUID.randomUUID();
+        UUID rowUuid2 = UUID.randomUUID();
+        RowId rowId1 = new RowId(1, rowUuid1);
+        RowId rowId2 = new RowId(1, rowUuid2);
+
+        var key1 = new TestKey(1, "foo");
+        var key2 = new TestKey(2, "baz");
+        var value = new TestValue(2, "bar");
+
+        writer.addWrite(storageUpdateHandler, rowUuid1, tableRow(key1, value));
+        commitWrite(rowId1);
+
+        writer.addWrite(storageUpdateHandler, rowUuid2, tableRow(key2, value));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId2), () -> {});
+
+        assertEquals(1, storage.rowsCount());
+
+        Set<RowId> pkRows = pkInnerStorage.allRows();
+        Set<RowId> sortedRows = sortedInnerStorage.allRows();
+        Set<RowId> hashRows = hashInnerStorage.allRows();
+
+        assertThat(pkRows, contains(rowId1));
+        assertThat(sortedRows, contains(rowId1));
+        assertThat(hashRows, contains(rowId1));
+
+        assertThat(pkRows, not(contains(rowId2)));
+        assertThat(sortedRows, not(contains(rowId2)));
+        assertThat(hashRows, not(contains(rowId2)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedOnTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {});
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRows(), contains(rowId));
+        assertThat(sortedInnerStorage.allRows(), contains(rowId));
+        assertThat(hashInnerStorage.allRows(), contains(rowId));
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWrite {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {}
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {}
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable TableRow row) {
+            TablePartitionId tablePartitionId = new TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row);
+    }
+
+    private static BinaryRow binaryRow(TestKey key, TestValue value) {
+        try {
+            return KV_MARSHALLER.marshal(key, value);
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    private static TableRow tableRow(TestKey key, TestValue value) {
+        return TableRowConverter.fromBinaryRow(binaryRow(key, value), KV_BINARY_CONVERTER);
+    }
+
+    private void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, CLOCK.now());
+
+            return null;
+        });
+    }
+
+    private static class TestKey {
+        int intKey;
+
+        String strKey;
+
+        TestKey() {
+        }
+
+        TestKey(int intKey, String strKey) {
+            this.intKey = intKey;
+            this.strKey = strKey;
+        }
+    }
+
+    private static class TestValue {
+        Integer intVal;
+
+        String strVal;
+
+        TestValue() {
+        }
+
+        TestValue(Integer intVal, String strVal) {
+            this.intVal = intVal;
+            this.strVal = strVal;
+        }
+    }
+}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 0c937c1f4b..b5ecfd78ba 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,10 +26,12 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -106,6 +108,11 @@ public class TestPartitionDataStorage implements PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException {
+        return partitionStorage.scanVersions(rowId);
+    }
+
     @Override
     public MvPartitionStorage getStorage() {
         return partitionStorage;