You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/10/28 15:22:42 UTC

[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
     new 6ff8e032e9 IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)
6ff8e032e9 is described below

commit 6ff8e032e9c80630e667f043805212e3299adcaf
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Oct 28 17:30:21 2022 +0300

    IGNITE-17967 RO writeIntent resolution tests hang up in case of multi node cluster (#1255)
    
    (cherry picked from commit 7d4bf9747dace1471214aea56caa3148b76a20eb)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |   3 +-
 .../replicator/PartitionReplicaListener.java       | 240 +++++++++++++--------
 .../ignite/internal/table/TxAbstractTest.java      |  10 +-
 .../apache/ignite/internal/table/TxLocalTest.java  |  15 +-
 .../table/impl/DummyInternalTableImpl.java         |  28 ++-
 5 files changed, 183 insertions(+), 113 deletions(-)

diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index b3fdf34019..8491b3ee99 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -141,7 +141,8 @@ public class SqlQueryProcessor implements QueryProcessor {
     /** Transaction manager. */
     private final TxManager txManager;
 
-    private HybridClock clock;
+    /** Clock. */
+    private final HybridClock clock;
 
     /** Constructor. */
     public SqlQueryProcessor(
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 42334a4117..4953287a6a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -241,25 +241,33 @@ public class PartitionReplicaListener implements ReplicaListener {
                     } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
                         return processMultiEntryAction((ReadWriteMultiRowReplicaRequest) request);
                     } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
-                        return processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request);
+                        return processTwoEntriesAction((ReadWriteSwapRowReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReadWriteScanRetrieveBatchReplicaRequest) {
-                        return processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) request);
+                        return processScanRetrieveBatchAction((ReadWriteScanRetrieveBatchReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReadWriteScanCloseReplicaRequest) {
                         processScanCloseAction((ReadWriteScanCloseReplicaRequest) request);
 
                         return completedFuture(null);
                     } else if (request instanceof TxFinishReplicaRequest) {
-                        return processTxFinishAction((TxFinishReplicaRequest) request);
+                        return processTxFinishAction((TxFinishReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof TxCleanupReplicaRequest) {
-                        return processTxCleanupAction((TxCleanupReplicaRequest) request);
+                        return processTxCleanupAction((TxCleanupReplicaRequest) request)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReadOnlySingleRowReplicaRequest) {
-                        return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary);
+                        return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request, isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReadOnlyMultiRowReplicaRequest) {
-                        return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary);
+                        return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request, isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) {
-                        return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary);
+                        return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request, isPrimary)
+                                .thenApply(Function.identity());
                     } else if (request instanceof ReplicaSafeTimeSyncRequest) {
-                        return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request);
+                        return processReplicaSafeTimeSyncRequest((ReplicaSafeTimeSyncRequest) request)
+                                .thenApply(Function.identity());
                     } else {
                         throw new UnsupportedReplicaRequestException(request.getClass());
                     }
@@ -327,7 +335,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(
+    private CompletableFuture<ArrayList<BinaryRow>> processReadOnlyScanRetrieveBatchAction(
             ReadOnlyScanRetrieveBatchReplicaRequest request,
             Boolean isPrimary
     ) {
@@ -339,24 +347,59 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
 
-        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+        CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
+
+        return safeReadFuture.thenCompose(unused -> retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
+    }
 
+    /**
+     * Extracts exact amount of entries, or less if cursor is become empty, from a cursor on the specific time.
+     *
+     * @param readTimestamp Timestamp of the moment when that moment when the data will be extracted.
+     * @param cursorId Cursor id.
+     * @param count Amount of entries which sill be extracted.
+     * @return Result future.
+     */
+    private CompletableFuture<ArrayList<BinaryRow>> retrieveExactEntriesUntilCursorEmpty(
+            HybridTimestamp readTimestamp,
+            IgniteUuid cursorId,
+            int count
+    ) {
         @SuppressWarnings("resource") PartitionTimestampCursor cursor = cursors.computeIfAbsent(cursorId,
                 id -> mvDataStorage.scan(HybridTimestamp.MAX_VALUE));
 
-        CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(readTimestamp);
+        ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(count);
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            while (batchRows.size() < batchCount && cursor.hasNext()) {
-                BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), readTimestamp, () -> cursor.committed(readTimestamp));
+        while (resolutionFuts.size() < count && cursor.hasNext()) {
+            ReadResult readResult = cursor.next();
+            HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+
+            BinaryRow candidate =
+                    newestCommitTimestamp == null || !readResult.isWriteIntent() ? null : cursor.committed(newestCommitTimestamp);
+
+            resolutionFuts.add(resolveReadResult(readResult, readTimestamp, () -> candidate));
+        }
+
+        return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenCompose(unused -> {
+            ArrayList<BinaryRow> rows = new ArrayList<>(count);
+
+            for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+                BinaryRow resolvedReadResult = resolutionFut.join();
 
                 if (resolvedReadResult != null) {
-                    batchRows.add(resolvedReadResult);
+                    rows.add(resolvedReadResult);
                 }
             }
 
-            return batchRows;
+            if (rows.size() < count && cursor.hasNext()) {
+                return retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, count - rows.size()).thenApply(binaryRows -> {
+                    rows.addAll(binaryRows);
+
+                    return rows;
+                });
+            } else {
+                return completedFuture(rows);
+            }
         });
     }
 
@@ -367,8 +410,8 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
-        BinaryRow tableRow = request.binaryRow();
+    private CompletableFuture<BinaryRow> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request, Boolean isPrimary) {
+        BinaryRow searchRow = request.binaryRow();
         HybridTimestamp readTimestamp = request.readTimestamp();
 
         if (request.requestType() != RequestType.RO_GET) {
@@ -378,11 +421,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
-            return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> binaryRow);
-        });
+        return safeReadFuture.thenCompose(unused -> resolveRowByPk(searchRow, readTimestamp));
     }
 
     /**
@@ -392,7 +431,13 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param isPrimary Whether the given replica is primary.
      * @return Result future.
      */
-    private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request, Boolean isPrimary) {
+    private CompletableFuture<ArrayList<BinaryRow>> processReadOnlyMultiEntryAction(
+            ReadOnlyMultiRowReplicaRequest request,
+            Boolean isPrimary
+    ) {
+        Collection<BinaryRow> searchRows = request.binaryRows();
+        HybridTimestamp readTimestamp = request.readTimestamp();
+
         if (request.requestType() != RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
                     format("Unknown single request [actionType={}]", request.requestType()));
@@ -400,17 +445,28 @@ public class PartitionReplicaListener implements ReplicaListener {
 
         CompletableFuture<Void> safeReadFuture = isPrimary ? completedFuture(null) : safeTime.waitFor(request.readTimestamp());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-17824 Dedicated thread pool should be used.
-        return safeReadFuture.thenApplyAsync(ignored -> {
-            ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
+        return safeReadFuture.thenCompose(unused -> {
+            ArrayList<CompletableFuture<BinaryRow>> resolutionFuts = new ArrayList<>(searchRows.size());
 
-            for (BinaryRow searchRow : request.binaryRows()) {
-                BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
+            for (BinaryRow searchRow : searchRows) {
+                CompletableFuture<BinaryRow> fut = resolveRowByPk(searchRow, readTimestamp);
 
-                result.add(row);
+                resolutionFuts.add(fut);
             }
 
-            return result;
+            return allOf(resolutionFuts.toArray(new CompletableFuture[0])).thenApply(unused1 -> {
+                ArrayList<BinaryRow> result = new ArrayList<>(resolutionFuts.size());
+
+                for (CompletableFuture<BinaryRow> resolutionFut : resolutionFuts) {
+                    BinaryRow resolvedReadResult = resolutionFut.join();
+
+                    if (resolvedReadResult != null) {
+                        result.add(resolvedReadResult);
+                    }
+                }
+
+                return result;
+            });
         });
     }
 
@@ -420,7 +476,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param request Request.
      * @return Future.
      */
-    private CompletionStage<Object> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
+    private CompletionStage<Void> processReplicaSafeTimeSyncRequest(ReplicaSafeTimeSyncRequest request) {
         return raftClient.run(new SafeTimeSyncCommand());
     }
 
@@ -489,7 +545,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param request Scan retrieve batch request operation.
      * @return Listener response.
      */
-    private CompletableFuture<Object> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
+    private CompletableFuture<ArrayList<BinaryRow>> processScanRetrieveBatchAction(ReadWriteScanRetrieveBatchReplicaRequest request) {
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
 
@@ -525,7 +581,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return future result of the operation.
      */
     // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
-    private CompletableFuture<Object> processTxFinishAction(TxFinishReplicaRequest request) {
+    private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest request) {
         List<ReplicationGroupId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream).map(IgniteBiTuple::get1).collect(Collectors.toList());
 
@@ -552,7 +608,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         )
         );
 
-        return allOf(cleanupFutures).thenApply(ignored -> null);
+        return allOf(cleanupFutures);
     }
 
     /**
@@ -601,7 +657,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return CompletableFuture of void.
      */
     // TODO: need to properly handle primary replica changes https://issues.apache.org/jira/browse/IGNITE-17615
-    private CompletableFuture processTxCleanupAction(TxCleanupReplicaRequest request) {
+    private CompletableFuture<Void> processTxCleanupAction(TxCleanupReplicaRequest request) {
         try {
             closeAllTransactionCursors(request.txId());
         } catch (Exception e) {
@@ -613,50 +669,6 @@ public class PartitionReplicaListener implements ReplicaListener {
                 .thenRun(() -> lockManager.locks(request.txId()).forEachRemaining(lockManager::release));
     }
 
-    /**
-     * Finds the row and its identifier by given pk search row.
-     *
-     * @param tableRow A bytes representing a primary tableRow.
-     * @param ts A timestamp regarding which we need to resolve the given row.
-     * @param action An action to perform on a resolved row.
-     * @param <T> A type of the value returned by action.
-     * @return Result of the given action.
-     */
-    private <T> T resolveRowByPk(
-            BinaryRow tableRow,
-            HybridTimestamp ts,
-            BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
-    ) {
-        try (Cursor<RowId> cursor = pkIndexStorage.get().get(tableRow)) {
-            for (RowId rowId : cursor) {
-                ReadResult readResult = mvDataStorage.read(rowId, ts);
-
-                BinaryRow row = resolveReadResult(readResult, ts, () -> {
-                    if (readResult.newestCommitTimestamp() == null) {
-                        return null;
-                    }
-
-                    ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
-
-                    assert !committedReadResult.isWriteIntent() :
-                            "The result is not committed [rowId=" + rowId + ", timestamp="
-                                    + readResult.newestCommitTimestamp() + ']';
-
-                    return committedReadResult.binaryRow();
-                });
-
-                if (row != null && row.hasValue()) {
-                    return action.apply(rowId, row);
-                }
-            }
-
-            return action.apply(null, null);
-        } catch (Exception e) {
-            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
-                    format("Unable to close cursor [tableId={}]", tableId), e);
-        }
-    }
-
     /**
      * Finds the row and its identifier by given pk search row.
      *
@@ -694,6 +706,42 @@ public class PartitionReplicaListener implements ReplicaListener {
                 });
     }
 
+    /**
+     * Finds the row and its identifier by given pk search row.
+     *
+     * @param searchKey A bytes representing a primary key.
+     * @param ts A timestamp regarding which we need to resolve the given row.
+     * @return Result of the given action.
+     */
+    private CompletableFuture<BinaryRow> resolveRowByPk(BinaryRow searchKey, HybridTimestamp ts) {
+        try (Cursor<RowId> cursor = pkIndexStorage.get().get(searchKey)) {
+            for (RowId rowId : cursor) {
+                ReadResult readResult = mvDataStorage.read(rowId, ts);
+
+                return resolveReadResult(readResult, ts, () -> {
+                    HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
+
+                    if (newestCommitTimestamp == null) {
+                        return null;
+                    }
+
+                    ReadResult committedReadResult = mvDataStorage.read(rowId, newestCommitTimestamp);
+
+                    assert !committedReadResult.isWriteIntent() :
+                            "The result is not committed [rowId=" + rowId + ", timestamp="
+                                    + newestCommitTimestamp + ']';
+
+                    return committedReadResult.binaryRow();
+                });
+            }
+
+            return completedFuture(null);
+        } catch (Exception e) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    format("Unable to close cursor [tableId={}]", tableId), e);
+        }
+    }
+
     /**
      * Tests row values for equality.
      *
@@ -1122,7 +1170,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
         }
 
-        return CompletableFuture.allOf(locks);
+        return allOf(locks);
     }
 
     private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow tableRow, RowId rowId, UUID txId) {
@@ -1139,7 +1187,7 @@ public class PartitionReplicaListener implements ReplicaListener {
             locks[idx++] = locker.locksForRemove(txId, tableRow, rowId);
         }
 
-        return CompletableFuture.allOf(locks);
+        return allOf(locks);
     }
 
     /**
@@ -1193,7 +1241,7 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param request Two actions operation request.
      * @return Listener response.
      */
-    private CompletableFuture<Object> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
+    private CompletableFuture<Boolean> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
         BinaryRow newRow = request.binaryRow();
         BinaryRow expectedRow = request.oldBinaryRow();
         TablePartitionId commitPartitionId = request.commitPartitionId();
@@ -1278,7 +1326,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 if (expectedTerm.equals(currentTerm)) {
                                     return completedFuture(null);
                                 } else {
-                                    return CompletableFuture.failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
+                                    return failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm));
                                 }
                             }
                     );
@@ -1297,7 +1345,8 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @return Resolved binary row.
      */
     private BinaryRow resolveReadResult(ReadResult readResult, UUID txId) {
-        return resolveReadResult(readResult, txId, null, null);
+        // Here is a safety join (waiting of the future result), because the resolution for RW transaction cannot lead to a network request.
+        return resolveReadResult(readResult, txId, null, null).join();
     }
 
     /**
@@ -1306,10 +1355,13 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param readResult Read result to resolve.
      * @param timestamp Timestamp.
      * @param lastCommitted Action to get the latest committed row.
-     * @return Resolved binary row.
+     * @return Future to resolved binary row.
      */
-    private BinaryRow resolveReadResult(ReadResult readResult, HybridTimestamp timestamp, Supplier<BinaryRow> lastCommitted) {
-
+    private CompletableFuture<BinaryRow> resolveReadResult(
+            ReadResult readResult,
+            HybridTimestamp timestamp,
+            Supplier<BinaryRow> lastCommitted
+    ) {
         return resolveReadResult(readResult, null, timestamp, lastCommitted);
     }
 
@@ -1327,9 +1379,9 @@ public class PartitionReplicaListener implements ReplicaListener {
      * @param txId Nullable transaction id, should be provided if resolution is performed within the context of RW transaction.
      * @param timestamp Timestamp is used in RO transaction only.
      * @param lastCommitted Action to get the latest committed row, it is used in RO transaction only.
-     * @return Resolved binary row.
+     * @return Future to resolved binary row.
      */
-    private BinaryRow resolveReadResult(
+    private CompletableFuture<BinaryRow> resolveReadResult(
             ReadResult readResult,
             @Nullable UUID txId,
             @Nullable HybridTimestamp timestamp,
@@ -1344,7 +1396,7 @@ public class PartitionReplicaListener implements ReplicaListener {
 
                 if (retrievedResultTxId == null || txId.equals(retrievedResultTxId)) {
                     // Same transaction - return retrieved value. It may be both writeIntent or regular value.
-                    return readResult.binaryRow();
+                    return completedFuture(readResult.binaryRow());
                 } else {
                     // Should never happen, currently, locks prevent reading another transaction intents during RW requests.
                     throw new AssertionError("Mismatched transaction id, expectedTxId={" + txId + "},"
@@ -1352,14 +1404,14 @@ public class PartitionReplicaListener implements ReplicaListener {
                 }
             } else {
                 if (!readResult.isWriteIntent()) {
-                    return readResult.binaryRow();
+                    return completedFuture(readResult.binaryRow());
                 }
 
                 CompletableFuture<BinaryRow> writeIntentResolutionFut = resolveWriteIntentAsync(
                         readResult, timestamp, lastCommitted);
 
                 // RO request.
-                return writeIntentResolutionFut.join();
+                return writeIntentResolutionFut;
             }
         }
     }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 0d9cc2d77b..844dab2eb5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -90,7 +90,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
     );
 
     /** Table ID test value. */
-    public static final UUID tableId2 = java.util.UUID.randomUUID();
+    public static final UUID tableId2 = UUID.randomUUID();
 
     protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
             1,
@@ -1798,7 +1798,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
         assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967")
     @Test
     public void testReadOnlyGetWriteIntentResolutionUpdate() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1822,7 +1821,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
         assertEquals(300., accounts.recordView().get(readOnlyTx2, makeKey(1)).doubleValue("balance"));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, https://issues.apache.org/jira/browse/IGNITE-17968")
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968")
     @Test
     public void testReadOnlyGetWriteIntentResolutionRemove() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
@@ -1857,7 +1856,6 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
         validateBalance(retrievedKeys, 100., 200.);
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967")
     // TODO: IGNITE-17968 Remove after fix.
     @Test
     public void testReadOnlyPendingWriteIntentSkipped() {
@@ -1876,14 +1874,14 @@ public abstract class TxAbstractTest extends IgniteAbstractTest {
         tx.commit();
 
         Collection<Tuple> retrievedKeys2 = accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
-        validateBalance(retrievedKeys2, 100., 300.);
+        validateBalance(retrievedKeys2, 100., 200.);
 
         Transaction readOnlyTx2 = igniteTransactions.readOnly().begin();
         Collection<Tuple> retrievedKeys3 = accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
         validateBalance(retrievedKeys3, 100., 300.);
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17967, https://issues.apache.org/jira/browse/IGNITE-17968")
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17968")
     @Test
     public void testReadOnlyPendingWriteIntentSkippedCombined() {
         accounts.recordView().upsert(null, makeValue(1, 100.));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 0c6a57dce7..9809044912 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import static org.mockito.Answers.RETURNS_DEEP_STUBS;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -31,6 +32,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
 import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.table.Table;
@@ -64,6 +67,7 @@ public class TxLocalTest extends TxAbstractTest {
         lockManager = new HeapLockManager();
 
         ReplicaService replicaSvc = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
+        PlacementDriver placementDriver = mock(PlacementDriver.class, RETURNS_DEEP_STUBS);
 
         Map<ReplicationGroupId, DummyInternalTableImpl> tables = new HashMap<>();
 
@@ -77,15 +81,22 @@ public class TxLocalTest extends TxAbstractTest {
             }
         ).when(replicaSvc).invoke(any(), any());
 
+        doAnswer(invocationOnMock -> {
+            TxStateReplicaRequest request = invocationOnMock.getArgument(1);
+
+            return CompletableFuture.completedFuture(
+                    tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
+        }).when(placementDriver).sendMetaRequest(any(), any());
+
         txManager = new TxManagerImpl(replicaSvc, lockManager, new HybridClockImpl());
 
         igniteTransactions = new IgniteTransactionsImpl(txManager);
 
-        DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, txManager, true);
+        DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, txManager, true, placementDriver);
 
         accounts = new TableImpl(table, new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), lockManager);
 
-        DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, txManager, true);
+        DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, txManager, true, placementDriver);
 
         customers = new TableImpl(table2, new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3f202bb5f0..4cff2aea18 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -50,14 +50,14 @@ import org.apache.ignite.internal.table.distributed.IndexLocker;
 import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
 import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
-import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
-import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateTableStorage;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -104,9 +104,15 @@ public class DummyInternalTableImpl extends InternalTableImpl {
      * @param txManager Transaction manager.
      * @param crossTableUsage If this dummy table is going to be used in cross-table tests, it won't mock the calls of ReplicaService
      *                        by itself.
+     * @param placementDriver Placement driver.
      */
-    public DummyInternalTableImpl(ReplicaService replicaSvc, TxManager txManager, boolean crossTableUsage) {
-        this(replicaSvc, new TestMvPartitionStorage(0), txManager, crossTableUsage);
+    public DummyInternalTableImpl(
+            ReplicaService replicaSvc,
+            TxManager txManager,
+            boolean crossTableUsage,
+            PlacementDriver placementDriver
+    ) {
+        this(replicaSvc, new TestMvPartitionStorage(0), txManager, crossTableUsage, placementDriver);
     }
 
     /**
@@ -116,7 +122,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
      * @param mvPartStorage Multi version partition storage.
      */
     public DummyInternalTableImpl(ReplicaService replicaSvc, MvPartitionStorage mvPartStorage) {
-        this(replicaSvc, mvPartStorage, null, false);
+        this(replicaSvc, mvPartStorage, null, false, null);
     }
 
     /**
@@ -127,12 +133,14 @@ public class DummyInternalTableImpl extends InternalTableImpl {
      * @param txManager Transaction manager, if {@code null}, then default one will be created.
      * @param crossTableUsage If this dummy table is going to be used in cross-table tests, it won't mock the calls of ReplicaService
      *                        by itself.
+     * @param placementDriver Placement driver.
      */
     public DummyInternalTableImpl(
             ReplicaService replicaSvc,
             MvPartitionStorage mvPartStorage,
             @Nullable TxManager txManager,
-            boolean crossTableUsage
+            boolean crossTableUsage,
+            PlacementDriver placementDriver
     ) {
         super(
                 "test",
@@ -143,7 +151,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 addr -> Mockito.mock(ClusterNode.class),
                 txManager == null ? new TxManagerImpl(replicaSvc, new HeapLockManager(), new HybridClockImpl()) : txManager,
                 mock(MvTableStorage.class),
-                mock(TxStateTableStorage.class),
+                new TestConcurrentHashMapTxStateTableStorage(),
                 replicaSvc,
                 new HybridClockImpl()
         );
@@ -242,15 +250,15 @@ public class DummyInternalTableImpl extends InternalTableImpl {
                 pkStorage,
                 clock,
                 new PendingComparableValuesTracker<>(clock.now()),
+                txStateStorage().getOrCreateTxStateStorage(0),
                 null,
-                null,
-                null,
+                placementDriver,
                 peer -> true
         );
 
         partitionListener = new PartitionListener(
                 mvPartStorage,
-                new TestConcurrentHashMapTxStateStorage(),
+                txStateStorage().getOrCreateTxStateStorage(0),
                 this.txManager,
                 () -> Map.of(pkStorage.get().id(), pkStorage.get())
         );