You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/10/23 08:06:47 UTC

[ignite-3] branch main updated: IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)

This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dc20b82c8 IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)
5dc20b82c8 is described below

commit 5dc20b82c8c0c09f66c4c04a98c74cece530d0ef
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Oct 23 12:06:41 2023 +0400

    IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)
---
 .../ignite/internal/{Hacks.java => Kludges.java}   |   4 +-
 .../testframework/BaseIgniteAbstractTest.java      |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |   2 +-
 .../internal/readonly/ItReadOnlyTxInPastTest.java  |   2 +-
 .../rebalance/ItRebalanceRecoveryTest.java         |   2 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +-
 .../schemasync/ItSchemaSyncSingleNodeTest.java     |  64 ++-
 .../internal/table/distributed/TableManager.java   |   2 +-
 .../replicator/CompatValidationResult.java         |  29 +-
 .../replicator/PartitionReplicaListener.java       | 148 ++++---
 .../replicator/SchemaCompatValidator.java          |  68 ++-
 .../distributed/schema/NonHistoricSchemas.java     |   7 +-
 .../distributed/schema/SchemaSyncService.java      |   1 +
 .../replication/PartitionReplicaListenerTest.java  | 461 ++++++++++++++++-----
 .../apache/ignite/internal/tx/TxManagerTest.java   |   2 +-
 15 files changed, 614 insertions(+), 182 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
similarity index 90%
rename from modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
rename to modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
index b9cbdc1498..e1a546d69a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal;
 
 /**
- * Contains hacks needed for the whole codebase. Should be removed as quickly as possible.
+ * Contains kludges needed for the whole codebase. Should be removed as quickly as possible.
  */
-public class Hacks {
+public class Kludges {
     // TODO: Remove after IGNITE-20499 is fixed.
     /** Name of the property overriding idle safe time propagation period (in milliseconds). */
     public static final String IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY = "IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS";
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 98221daeed..7ab1133e35 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.testframework;
 
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.lang.IgniteSystemProperties.IGNITE_SENSITIVE_DATA_LOGGING;
 import static org.apache.ignite.internal.lang.IgniteSystemProperties.getString;
 import static org.apache.ignite.internal.util.IgniteUtils.monotonicMs;
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 91fb9481c6..c9ff648060 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.replicator;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 
 import java.io.IOException;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
index 4b1d651127..fdc00052c3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.readonly;
 
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
index 20b290dc6a..33800e11db 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.rebalance;
 
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 751baf57f3..2b61381301 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.runner.app;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
index 8df0537d46..a2ec535ba4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -78,9 +79,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
     @ParameterizedTest
     @EnumSource(Operation.class)
     void readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation operation) {
-        cluster.doInSession(0, session -> {
-            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
-        });
+        createTable();
 
         Table table = node.tables().table(TABLE_NAME);
 
@@ -101,7 +100,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
             assertThat(
                     ex.getMessage(),
                     containsString(String.format(
-                            "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2",
+                            "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2]",
                             tableId
                     ))
             );
@@ -121,6 +120,12 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
         assertThat(tx.state(), is(TxState.ABORTED));
     }
 
+    private void createTable() {
+        cluster.doInSession(0, session -> {
+            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+        });
+    }
+
     private void alterTable(String tableName) {
         cluster.doInSession(0, session -> {
             executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session);
@@ -132,10 +137,10 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
     }
 
     private void enlistTableInTransaction(Table table, Transaction tx) {
-        executeReadOn(table, tx, cluster);
+        executeRwReadOn(table, tx, cluster);
     }
 
-    private static void executeReadOn(Table table, Transaction tx, Cluster cluster) {
+    private static void executeRwReadOn(Table table, Transaction tx, Cluster cluster) {
         cluster.doInSession(0, session -> {
             executeUpdate("SELECT * FROM " + table.name(), session, tx);
         });
@@ -180,7 +185,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
         SQL_READ {
             @Override
             void execute(Table table, Transaction tx, Cluster cluster) {
-                executeReadOn(table, tx, cluster);
+                executeRwReadOn(table, tx, cluster);
             }
 
             @Override
@@ -218,4 +223,49 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
 
         assertDoesNotThrow(() -> putInTx(table, tx));
     }
+
+    @ParameterizedTest
+    @EnumSource(Operation.class)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20680")
+    void readWriteOperationAfterDroppingTargetTableIsRejected(Operation operation) {
+        createTable();
+
+        Table table = node.tables().table(TABLE_NAME);
+
+        putPreExistingValueTo(table);
+
+        InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+        enlistTableInTransaction(table, tx);
+
+        dropTable(TABLE_NAME);
+
+        IgniteException ex;
+
+        int tableId = ((TableImpl) table).tableId();
+
+        if (operation.sql()) {
+            ex = assertThrows(IgniteException.class, () -> operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    containsString(String.format("Table was dropped [table=%s]", tableId))
+            );
+        } else {
+            ex = assertThrows(IncompatibleSchemaException.class, () -> operation.execute(table, tx, cluster));
+            assertThat(
+                    ex.getMessage(),
+                    is(String.format("Table was dropped [table=%s]", tableId))
+            );
+        }
+
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+
+        assertThat(tx.state(), is(TxState.ABORTED));
+    }
+
+    private void dropTable(String tableName) {
+        cluster.doInSession(0, session -> {
+            executeUpdate("DROP TABLE " + tableName, session);
+        });
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 412c5df1c3..82aa20c5ba 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -950,7 +950,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
                 txStatePartitionStorage,
                 transactionStateResolver,
                 partitionUpdateHandlers.storageUpdateHandler,
-                new NonHistoricSchemas(schemaManager),
+                new NonHistoricSchemas(schemaManager, schemaSyncService),
                 localNode(),
                 schemaSyncService,
                 catalogService,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
index a6b2e54d45..dfb7e30b4a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.replicator;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Result of a schema compatibility validation.
  */
@@ -26,7 +28,7 @@ public class CompatValidationResult {
     private final boolean successful;
     private final int failedTableId;
     private final int fromSchemaVersion;
-    private final int toSchemaVersion;
+    private final @Nullable Integer toSchemaVersion;
 
     /**
      * Returns a successful validation result.
@@ -38,18 +40,29 @@ public class CompatValidationResult {
     }
 
     /**
-     * Creates a validation result denoting validation failure.
+     * Creates a validation result denoting incompatible schema change.
      *
      * @param failedTableId Table which schema change is incompatible.
      * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
      * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
      * @return A validation result for a failure.
      */
-    public static CompatValidationResult failure(int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+    public static CompatValidationResult incompatibleChange(int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
         return new CompatValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
     }
 
-    private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+    /**
+     * Creates a validation result denoting 'table already dropped when commit is made' situation.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static CompatValidationResult tableDropped(int failedTableId, int fromSchemaVersion) {
+        return new CompatValidationResult(false, failedTableId, fromSchemaVersion, null);
+    }
+
+    private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, @Nullable Integer toSchemaVersion) {
         this.successful = successful;
         this.failedTableId = failedTableId;
         this.fromSchemaVersion = fromSchemaVersion;
@@ -65,6 +78,13 @@ public class CompatValidationResult {
         return successful;
     }
 
+    /**
+     * Returns whether the validation has failed due to table was already dropped at the commit timestamp.
+     */
+    public boolean isTableDropped() {
+        return !successful && toSchemaVersion == null;
+    }
+
     /**
      * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
      * otherwise an exception is thrown.
@@ -95,6 +115,7 @@ public class CompatValidationResult {
      */
     public int toSchemaVersion() {
         assert !successful : "Should not be called on a successful result";
+        assert toSchemaVersion != null : "Should not be called when there is no toSchemaVersion";
 
         return toSchemaVersion;
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c7b238fcfa..4afcaa4dcf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -26,7 +26,6 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.tx.TxState.ABANDONED;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -466,9 +465,9 @@ public class PartitionReplicaListener implements ReplicaListener {
             }
         }
 
-        CompletableFuture<Void> waitForSchemas = waitForSchemasBeforeReading(request);
-
-        return waitForSchemas.thenCompose(unused -> processOperationRequest(request, isPrimary, senderId));
+        return waitForSchemasBeforeReading(request)
+                .thenCompose(unused -> validateTableExistence(request))
+                .thenCompose(opStartTimestamp -> processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
     }
 
     /**
@@ -493,7 +492,40 @@ public class PartitionReplicaListener implements ReplicaListener {
         return tsToWaitForSchemas == null ? completedFuture(null) : schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchemas);
     }
 
-    private CompletableFuture<?> processOperationRequest(ReplicaRequest request, @Nullable Boolean isPrimary, String senderId) {
+    private CompletableFuture<HybridTimestamp> validateTableExistence(ReplicaRequest request) {
+        HybridTimestamp opStartTs;
+
+        if (request instanceof ReadWriteScanCloseReplicaRequest) {
+            // We don't need to validate close request for table existence.
+            opStartTs = null;
+        } else if (request instanceof ReadWriteReplicaRequest) {
+            opStartTs = hybridClock.now();
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
+        } else if (request instanceof ReadOnlyDirectReplicaRequest) {
+            opStartTs = hybridClock.now();
+        } else {
+            opStartTs = null;
+        }
+
+        if (opStartTs == null) {
+            return completedFuture(null);
+        }
+
+        return schemaSyncService.waitForMetadataCompleteness(opStartTs)
+                .thenApply(unused -> {
+                    schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId());
+
+                    return opStartTs;
+                });
+    }
+
+    private CompletableFuture<?> processOperationRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            String senderId,
+            HybridTimestamp opStartTimestamp
+    ) {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
@@ -523,7 +555,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         if (allElementsAreNull(rows)) {
                             return completedFuture(rows);
                         } else {
-                            return validateAtTimestamp(req.transactionId())
+                            return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
                                     .thenApply(ignored -> rows);
                         }
                     })
@@ -557,9 +589,9 @@ public class PartitionReplicaListener implements ReplicaListener {
         } else if (request instanceof BuildIndexReplicaRequest) {
             return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
         } else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
-            return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request);
+            return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTimestamp);
         } else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
-            return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request);
+            return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTimestamp);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -1289,10 +1321,16 @@ public class PartitionReplicaListener implements ReplicaListener {
         if (request.commit()) {
             HybridTimestamp commitTimestamp = request.commitTimestamp();
 
-            return schemaCompatValidator.validateForward(txId, enlistedGroups, commitTimestamp)
-                    .thenCompose(validationResult ->
-                            finishAndCleanup(enlistedGroups, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
-                                    .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)));
+            return schemaCompatValidator.validateCommit(txId, enlistedGroups, commitTimestamp)
+                    .thenCompose(validationResult -> {
+                        return finishAndCleanup(
+                                enlistedGroups,
+                                validationResult.isSuccessful(),
+                                validationResult.isSuccessful() ? commitTimestamp : null,
+                                txId,
+                                txCoordinatorId
+                        ).thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult));
+                    });
         } else {
             // Aborting.
             return finishAndCleanup(enlistedGroups, false, null, txId, txCoordinatorId);
@@ -1301,9 +1339,15 @@ public class PartitionReplicaListener implements ReplicaListener {
 
     private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult) {
         if (!validationResult.isSuccessful()) {
-            throw new IncompatibleSchemaAbortException("Commit failed because schema "
-                    + validationResult.fromSchemaVersion() + " is not forward-compatible with "
-                    + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
+            if (validationResult.isTableDropped()) {
+                throw new IncompatibleSchemaAbortException(
+                        format("Commit failed because a table was already dropped [tableId={}]", validationResult.failedTableId())
+                );
+            } else {
+                throw new IncompatibleSchemaAbortException("Commit failed because schema "
+                        + validationResult.fromSchemaVersion() + " is not forward-compatible with "
+                        + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
+            }
         }
     }
 
@@ -1416,9 +1460,11 @@ public class PartitionReplicaListener implements ReplicaListener {
             @Nullable HybridTimestamp commitTimestamp,
             String txCoordinatorId
     ) {
-        HybridTimestamp currentTimestamp = hybridClock.now();
+        assert !commit || (commitTimestamp != null);
 
-        return reliableCatalogVersionFor(currentTimestamp)
+        HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : hybridClock.now();
+
+        return reliableCatalogVersionFor(tsForCatalogVersion)
                 .thenCompose(catalogVersion -> {
                     synchronized (commandProcessingLinearizationMutex) {
                         FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
@@ -1507,9 +1553,9 @@ public class PartitionReplicaListener implements ReplicaListener {
         }
 
         return allOffFuturesExceptionIgnored(txUpdateFutures, request).thenCompose(v -> {
-            long commandTimestamp = hybridClock.nowLong();
+            HybridTimestamp commandTimestamp = hybridClock.now();
 
-            return reliableCatalogVersionFor(hybridTimestamp(commandTimestamp))
+            return reliableCatalogVersionFor(commandTimestamp)
                     .thenCompose(catalogVersion -> {
                         synchronized (commandProcessingLinearizationMutex) {
                             TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
@@ -1811,13 +1857,14 @@ public class PartitionReplicaListener implements ReplicaListener {
      * Processes multiple entries direct request for read only transaction.
      *
      * @param request Read only multiple entries request.
+     * @param opStartTimestamp Moment when the operation processing was started in this class.
      * @return Result future.
      */
     private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction(
-            ReadOnlyDirectMultiRowReplicaRequest request
-    ) {
+            ReadOnlyDirectMultiRowReplicaRequest request,
+            HybridTimestamp opStartTimestamp) {
         List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
-        HybridTimestamp readTimestamp = hybridClock.now();
+        HybridTimestamp readTimestamp = opStartTimestamp;
 
         if (request.requestType() != RequestType.RO_GET_ALL) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -1905,7 +1952,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(new ReplicaResult(result, null));
                     }
 
-                    return validateOperationAgainstSchema(request.transactionId())
+                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                             .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
@@ -1975,7 +2022,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     return allOf(insertLockFuts)
                             .thenCompose(ignored ->
                                     // We are inserting completely new rows - no need to cleanup anything in this case, hence empty times.
-                                    validateOperationAgainstSchema(request.transactionId())
+                                    validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                             )
                             .thenCompose(catalogVersion -> applyUpdateAllCommand(
                                             request,
@@ -2036,7 +2083,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(new ReplicaResult(null, null));
                     }
 
-                    return validateOperationAgainstSchema(request.transactionId())
+                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                             .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
@@ -2107,7 +2154,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 return completedFuture(result);
                             }
 
-                            return validateAtTimestamp(txId)
+                            return validateRwReadAgainstSchemaAfterTakingLocks(txId)
                                     .thenApply(unused -> new ReplicaResult(result, null));
                         });
             }
@@ -2154,7 +2201,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                         return completedFuture(new ReplicaResult(result, null));
                     }
 
-                    return validateOperationAgainstSchema(request.transactionId())
+                    return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                             .thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateAllCommand(
@@ -2470,11 +2517,15 @@ public class PartitionReplicaListener implements ReplicaListener {
      * Processes single entry direct request for read only transaction.
      *
      * @param request Read only single entry request.
+     * @param opStartTimestamp Moment when the operation processing was started in this class.
      * @return Result future.
      */
-    private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest request) {
+    private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(
+            ReadOnlyDirectSingleRowReplicaRequest request,
+            HybridTimestamp opStartTimestamp
+    ) {
         BinaryTuple primaryKey = resolvePk(request.primaryKey());
-        HybridTimestamp readTimestamp = hybridClock.now();
+        HybridTimestamp readTimestamp = opStartTimestamp;
 
         if (request.requestType() != RequestType.RO_GET) {
             throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -2511,7 +2562,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                     return completedFuture(new ReplicaResult(false, request.full() ? null : completedFuture(null)));
                                 }
 
-                                return validateOperationAgainstSchema(request.transactionId())
+                                return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                         .thenCompose(catalogVersion -> awaitCleanup(validatedRowId, catalogVersion))
                                         .thenCompose(
                                                 catalogVersion -> applyUpdateCommand(
@@ -2536,7 +2587,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     RowId rowId0 = new RowId(partId(), UUID.randomUUID());
 
                     return takeLocksForInsert(searchRow, rowId0, txId)
-                            .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
                                                     request,
@@ -2567,7 +2618,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return lockFut
-                            .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                     .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
@@ -2599,7 +2650,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                             : takeLocksForUpdate(searchRow, rowId0, txId);
 
                     return lockFut
-                            .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                     .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
@@ -2627,7 +2678,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                     .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
@@ -2655,7 +2706,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForUpdate(searchRow, rowId, txId)
-                            .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+                            .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
                                     .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
@@ -2706,7 +2757,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForGet(rowId, txId)
-                            .thenCompose(ignored -> validateAtTimestamp(txId))
+                            .thenCompose(ignored -> validateRwReadAgainstSchemaAfterTakingLocks(txId))
                             .thenApply(ignored -> new ReplicaResult(row, null));
                 });
             }
@@ -2717,7 +2768,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(rowLock -> validateOperationAgainstSchema(request.transactionId()))
+                            .thenCompose(rowLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
                             .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateCommand(
@@ -2741,7 +2792,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                     }
 
                     return takeLocksForDelete(row, rowId, txId)
-                            .thenCompose(ignored -> validateOperationAgainstSchema(request.transactionId()))
+                            .thenCompose(ignored -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
                             .thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
                             .thenCompose(
                                     catalogVersion -> applyUpdateCommand(
@@ -2985,7 +3036,7 @@ public class PartitionReplicaListener implements ReplicaListener {
                                 return completedFuture(new ReplicaResult(false, null));
                             }
 
-                            return validateOperationAgainstSchema(txId)
+                            return validateWriteAgainstSchemaAfterTakingLocks(txId)
                                     .thenCompose(catalogVersion -> awaitCleanup(rowIdLock.get1(), catalogVersion))
                                     .thenCompose(
                                             catalogVersion -> applyUpdateCommand(
@@ -3259,25 +3310,26 @@ public class PartitionReplicaListener implements ReplicaListener {
         return false;
     }
 
-    private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
+    /**
+     * Takes current timestamp and makes schema related validations at this timestamp.
+     *
+     * @param txId Transaction ID.
+     * @return Future that will complete when validation completes.
+     */
+    private CompletableFuture<Void> validateRwReadAgainstSchemaAfterTakingLocks(UUID txId) {
         HybridTimestamp operationTimestamp = hybridClock.now();
 
         return schemaSyncService.waitForMetadataCompleteness(operationTimestamp)
-                .thenApply(unused -> {
-                    failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
-
-                    return null;
-                });
+                .thenRun(() -> failIfSchemaChangedSinceTxStart(txId, operationTimestamp));
     }
 
     /**
-     * Chooses operation timestamp and makes schema related validations. The operation timestamp is only used for validation,
-     * it is NOT sent as safeTime timestamp.
+     * Takes current timestamp and makes schema related validations at this timestamp.
      *
      * @param txId Transaction ID.
      * @return Future that will complete with catalog version associated with given operation though the operation timestamp.
      */
-    private CompletableFuture<Integer> validateOperationAgainstSchema(UUID txId) {
+    private CompletableFuture<Integer> validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
         HybridTimestamp operationTimestamp = hybridClock.now();
 
         return reliableCatalogVersionFor(operationTimestamp)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 9b37043388..90ca7d5d55 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
 import org.apache.ignite.internal.table.distributed.schema.Schemas;
 import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
 import org.apache.ignite.internal.tx.TransactionIds;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Validates schema compatibility.
@@ -48,19 +47,19 @@ class SchemaCompatValidator {
     }
 
     /**
-     * Performs commit forward compatibility validation. That is, for each table enlisted in the transaction, checks to see whether the
-     * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit
-     * timestamp).
+     * Performs commit validation. That is, checks that each table enlisted in the tranasction still exists at the commit timestamp,
+     * and that the initial schema of the table (identified by the begin timestamp) is forward-compatible with the commit schema
+     * (identified by the commit timestamp).
      *
      * @param txId ID of the transaction that gets validated.
      * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
-     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @param commitTimestamp Commit timestamp.
      * @return Future of validation result.
      */
-    CompletableFuture<CompatValidationResult> validateForward(
+    CompletableFuture<CompatValidationResult> validateCommit(
             UUID txId,
             Collection<TablePartitionId> enlistedGroupIds,
-            @Nullable HybridTimestamp commitTimestamp
+            HybridTimestamp commitTimestamp
     ) {
         HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
 
@@ -68,23 +67,18 @@ class SchemaCompatValidator {
                 .map(TablePartitionId::tableId)
                 .collect(toSet());
 
-        assert commitTimestamp != null;
         // Using compareTo() instead of after()/begin() because the latter methods take clock skew into account
         // which only makes sense when comparing 'unrelated' timestamps. beginTs and commitTs have a causal relationship,
         // so we don't need to account for clock skew.
         assert commitTimestamp.compareTo(beginTimestamp) > 0;
 
         return schemas.waitForSchemasAvailability(commitTimestamp)
-                .thenApply(ignored -> validateForwardSchemasCompatibility(tableIds, commitTimestamp, beginTimestamp));
+                .thenApply(ignored -> validateCommit(tableIds, commitTimestamp, beginTimestamp));
     }
 
-    private CompatValidationResult validateForwardSchemasCompatibility(
-            Set<Integer> tableIds,
-            HybridTimestamp commitTimestamp,
-            HybridTimestamp beginTimestamp
-    ) {
+    private CompatValidationResult validateCommit(Set<Integer> tableIds, HybridTimestamp commitTimestamp, HybridTimestamp beginTimestamp) {
         for (int tableId : tableIds) {
-            CompatValidationResult validationResult = validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId);
+            CompatValidationResult validationResult = validateCommit(beginTimestamp, commitTimestamp, tableId);
 
             if (!validationResult.isSuccessful()) {
                 return validationResult;
@@ -94,6 +88,29 @@ class SchemaCompatValidator {
         return CompatValidationResult.success();
     }
 
+    private CompatValidationResult validateCommit(HybridTimestamp beginTimestamp, HybridTimestamp commitTimestamp, int tableId) {
+        CatalogTableDescriptor tableAtCommitTs = catalogService.table(tableId, commitTimestamp.longValue());
+
+        if (tableAtCommitTs == null) {
+            CatalogTableDescriptor tableAtTxStart = catalogService.table(tableId, beginTimestamp.longValue());
+            assert tableAtTxStart != null : "No table " + tableId + " at ts " + beginTimestamp;
+
+            return CompatValidationResult.tableDropped(tableId, tableAtTxStart.schemaId());
+        }
+
+        return validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId);
+    }
+
+    /**
+     * Performs forward compatibility validation. That is, for the given table, checks to see whether the
+     * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit
+     * timestamp).
+     *
+     * @param beginTimestamp Begin timestamp of a transaction.
+     * @param commitTimestamp Commit timestamp.
+     * @param tableId ID of the table that is under validation.
+     * @return Validation result.
+     */
     private CompatValidationResult validateForwardSchemaCompatibility(
             HybridTimestamp beginTimestamp,
             HybridTimestamp commitTimestamp,
@@ -107,7 +124,7 @@ class SchemaCompatValidator {
             FullTableSchema oldSchema = tableSchemas.get(i);
             FullTableSchema newSchema = tableSchemas.get(i + 1);
             if (!isForwardCompatible(oldSchema, newSchema)) {
-                return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
+                return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
             }
         }
 
@@ -160,7 +177,7 @@ class SchemaCompatValidator {
             FullTableSchema oldSchema = tableSchemas.get(i);
             FullTableSchema newSchema = tableSchemas.get(i + 1);
             if (!isBackwardCompatible(oldSchema, newSchema)) {
-                return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
+                return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
             }
         }
 
@@ -178,7 +195,10 @@ class SchemaCompatValidator {
         CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue());
 
         assert tableAtBeginTs != null;
-        assert tableAtOpTs != null;
+
+        if (tableAtOpTs == null) {
+            throw tableWasDroppedException(tableId);
+        }
 
         if (tableAtOpTs.tableVersion() != tableAtBeginTs.tableVersion()) {
             throw new IncompatibleSchemaException(
@@ -189,4 +209,16 @@ class SchemaCompatValidator {
             );
         }
     }
+
+    private static IncompatibleSchemaException tableWasDroppedException(int tableId) {
+        return new IncompatibleSchemaException(String.format("Table was dropped [table=%d]", tableId));
+    }
+
+    void failIfTableDoesNotExistAt(HybridTimestamp operationTimestamp, int tableId) {
+        CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue());
+
+        if (tableAtOpTs == null) {
+            throw tableWasDroppedException(tableId);
+        }
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
index 83558e0f74..c6eb5f38c8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
@@ -52,13 +52,16 @@ import org.apache.ignite.internal.type.VarlenNativeType;
 public class NonHistoricSchemas implements Schemas {
     private final SchemaManager schemaManager;
 
-    public NonHistoricSchemas(SchemaManager schemaManager) {
+    private final SchemaSyncService schemaSyncService;
+
+    public NonHistoricSchemas(SchemaManager schemaManager, SchemaSyncService schemaSyncService) {
         this.schemaManager = schemaManager;
+        this.schemaSyncService = schemaSyncService;
     }
 
     @Override
     public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
-        return completedFuture(null);
+        return schemaSyncService.waitForMetadataCompleteness(ts);
     }
 
     @Override
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
index aaaf4f47c0..b062860842 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 /**
  * Implements Schema Synchronization wait logic as defined in IEP-98.
  */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
 public interface SchemaSyncService {
     /**
      * Waits till metadata (like table/index schemas) is complete for the given timestamp. The 'complete' here means
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 68b824d48a..eedada14c4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -134,6 +135,10 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
 import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
 import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
@@ -166,7 +171,6 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeImpl;
 import org.apache.ignite.network.MessagingService;
@@ -212,6 +216,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private static final int TABLE_ID = 1;
 
+    private static final int ANOTHER_TABLE_ID = 2;
+
     private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
 
     /** The storage stores partition data. */
@@ -576,18 +582,41 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     public void testReadOnlySingleRowReplicaRequestEmptyResult() throws Exception {
         BinaryRow testBinaryKey = nextBinaryKey();
 
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
-                .primaryKey(testBinaryKey.tupleSlice())
-                .requestType(RequestType.RO_GET)
-                .build(), localNode.id());
+        ByteBuffer pk = testBinaryKey.tupleSlice();
+
+        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(pk);
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
 
         assertNull(binaryRow);
     }
 
+    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk) {
+        return doReadOnlySingleGet(pk, clock.now());
+    }
+
+    private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk, HybridTimestamp readTimestamp) {
+        ReadOnlySingleRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
+                .groupId(grpId)
+                .readTimestampLong(readTimestamp.longValue())
+                .primaryKey(pk)
+                .requestType(RequestType.RO_GET)
+                .build();
+
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
+
+    private CompletableFuture<ReplicaResult> doReadOnlyDirectSingleGet(ByteBuffer pk) {
+        ReadOnlyDirectSingleRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
+                .groupId(grpId)
+                .primaryKey(pk)
+                .requestType(RequestType.RO_GET)
+                .enlistmentConsistencyToken(1L)
+                .build();
+
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
+
     @Test
     public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception {
         UUID txId = newTxId();
@@ -599,12 +628,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
         testMvPartitionStorage.commitWrite(rowId, clock.now());
 
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
-                .primaryKey(testBinaryKey.tupleSlice())
-                .requestType(RequestType.RO_GET)
-                .build(), localNode.id());
+        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -623,12 +647,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, localNode.id(), clock.now()));
 
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
-                .primaryKey(testBinaryKey.tupleSlice())
-                .requestType(RequestType.RO_GET)
-                .build(), localNode.id());
+        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -646,12 +665,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), null));
 
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
-                .primaryKey(testBinaryKey.tupleSlice())
-                .requestType(RequestType.RO_GET)
-                .build(), localNode.id());
+        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -670,12 +684,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
         txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, localNode.id(), null));
 
-        CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
-                .groupId(grpId)
-                .readTimestampLong(clock.nowLong())
-                .primaryKey(testBinaryKey.tupleSlice())
-                .requestType(RequestType.RO_GET)
-                .build(), localNode.id());
+        CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
 
         BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
 
@@ -981,11 +990,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     @Test
-    public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() throws MarshallerException {
+    public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() {
         UUID txId = newTxId();
 
         BinaryRow testRow = binaryRow(0);
-        BinaryRow testRowPk = kvMarshaller.marshal(new TestKey(0, "k0"));
+        BinaryRow testRowPk = marshalQuietly(new TestKey(0, "k0"), kvMarshaller);
 
         assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully());
 
@@ -1032,8 +1041,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         cleanup(txId);
     }
 
+    private static <K, V> Row marshalQuietly(K key, KvMarshaller<K, V> marshaller) {
+        try {
+            return marshaller.marshal(key);
+        } catch (MarshallerException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Test
-    public void testWriteIntentOnPrimaryReplicaMultiRowOps() throws MarshallerException {
+    public void testWriteIntentOnPrimaryReplicaMultiRowOps() {
         UUID txId = newTxId();
         BinaryRow row0 = binaryRow(0);
         BinaryRow row1 = binaryRow(1);
@@ -1056,8 +1073,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         checkRowInMvStorage(newRow1, true);
 
         Collection<BinaryRow> newRowPks = List.of(
-                kvMarshaller.marshal(new TestKey(0, "k0")),
-                kvMarshaller.marshal(new TestKey(1, "k1"))
+                marshalQuietly(new TestKey(0, "k0"), kvMarshaller),
+                marshalQuietly(new TestKey(1, "k1"), kvMarshaller)
         );
 
         assertThat(doMultiRowPkRequest(txId, newRowPks, RequestType.RW_DELETE_ALL), willCompleteSuccessfully());
@@ -1099,6 +1116,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
+        return doSingleRowPkRequest(txId, binaryRow, requestType, false);
+    }
+
+    private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
         return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
                         .groupId(grpId)
                         .transactionId(txId)
@@ -1106,6 +1127,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .primaryKey(binaryRow.tupleSlice())
                         .term(1L)
                         .commitPartitionId(commitPartitionId())
+                        .full(full)
                         .build(),
                 localNode.id()
         );
@@ -1306,14 +1328,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             @Values(booleans = {false, true}) boolean upsertAfterDelete,
             @Values(booleans = {false, true}) boolean committed,
             @Values(booleans = {false, true}) boolean multiple
-    ) throws MarshallerException {
+    ) {
         BinaryRow br1 = binaryRow(1);
 
-        BinaryRow br1Pk = kvMarshaller.marshal(new TestKey(1, "k" + 1));
+        BinaryRow br1Pk = marshalQuietly(new TestKey(1, "k" + 1), kvMarshaller);
 
         BinaryRow br2 = binaryRow(2);
 
-        BinaryRow br2Pk = kvMarshaller.marshal(new TestKey(2, "k" + 2));
+        BinaryRow br2Pk = marshalQuietly(new TestKey(2, "k" + 2), kvMarshaller);
 
         if (insertFirst) {
             UUID tx0 = newTxId();
@@ -1369,7 +1391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             List<BinaryRow> expected = committed
                     ? (upsertAfterDelete ? allRows : allRowsButModified)
                     : (insertFirst ? allRows : singletonList((BinaryRow) null));
-            List<BinaryRow> res = roGetAll(allRowsPks, clock.nowLong());
+            List<BinaryRow> res = roGetAll(allRowsPks, clock.now());
 
             assertEquals(allRows.size(), res.size());
 
@@ -1556,13 +1578,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
 
     private BinaryRow marshalKeyOrKeyValue(RequestType requestType, TestKey key) {
         try {
-            return RequestTypes.isKeyOnly(requestType) ? kvMarshaller.marshal(key) : kvMarshaller.marshal(key, someValue);
+            return RequestTypes.isKeyOnly(requestType) ? marshalQuietly(key, kvMarshaller) : kvMarshaller.marshal(key, someValue);
         } catch (MarshallerException e) {
             throw new AssertionError(e);
         }
     }
 
-    private void testFailsWhenReadingFromFutureIncompatibleSchema(ListenerInvocation listenerInvocation) {
+    private void testFailsWhenReadingFromFutureIncompatibleSchema(RwListenerInvocation listenerInvocation) {
         UUID targetTxId = transactionIdFor(clock.now());
 
         TestKey key = simulateWriteWithSchemaVersionFromFuture();
@@ -1700,13 +1722,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     @Test
-    public void failsWhenFullScanReadsTupleWithIncompatibleSchemaFromFuture() {
+    public void failsWhenScanReadsTupleWithIncompatibleSchemaFromFuture() {
         testFailsWhenReadingFromFutureIncompatibleSchema(
-                (targetTxId, key) -> doRwFullScanRetrieveBatchRequest(targetTxId, false)
+                (targetTxId, key) -> doRwScanRetrieveBatchRequest(targetTxId)
         );
     }
 
-    private CompletableFuture<?> doRwFullScanRetrieveBatchRequest(UUID targetTxId, boolean full) {
+    private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) {
         return partitionReplicaListener.invoke(
                 TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         .groupId(grpId)
@@ -1714,7 +1736,32 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                         .term(1L)
                         .scanId(1)
                         .batchSize(100)
-                        .full(full)
+                        .full(false)
+                        .build(),
+                localNode.id()
+        );
+    }
+
+    private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
+        return partitionReplicaListener.invoke(
+                TABLE_MESSAGES_FACTORY.readWriteScanCloseReplicaRequest()
+                        .groupId(grpId)
+                        .transactionId(targetTxId)
+                        .term(1L)
+                        .scanId(1)
+                        .build(),
+                localNode.id()
+        );
+    }
+
+    private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, HybridTimestamp readTimestamp) {
+        return partitionReplicaListener.invoke(
+                TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+                        .groupId(grpId)
+                        .transactionId(targetTxId)
+                        .scanId(1)
+                        .batchSize(100)
+                        .readTimestampLong(readTimestamp.longValue())
                         .build(),
                 localNode.id()
         );
@@ -1742,7 +1789,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .map(Arguments::of);
     }
 
-    private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, ListenerInvocation listenerInvocation) {
+    private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) {
         TestKey key = nextKey();
 
         if (RequestTypes.looksUpFirst(requestType)) {
@@ -1755,16 +1802,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42);
 
         UUID targetTxId = newTxId();
-        HybridTimestamp beginTs = TransactionIds.beginTimestamp(targetTxId);
 
         CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
 
         assertThat(future, willCompleteSuccessfully());
 
-        // Make sure metadata completeness is awaited for (at operation timestamp, that is, later than beginTs).
-        //noinspection ConstantConditions
-        verify(schemaSyncService).waitForMetadataCompleteness(gt(beginTs));
-
         // Make sure catalog required version is filled in the executed update command.
         verify(mockRaftClient, atLeast(1)).run(commandCaptor.capture());
 
@@ -1822,21 +1864,21 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             boolean onExistingRow,
             boolean full
     ) {
-        ListenerInvocation invocation = null;
+        RwListenerInvocation invocation = null;
 
         if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
             invocation = (targetTxId, key) -> {
-                return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+                return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
             };
         } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
             invocation = (targetTxId, key) -> {
-                return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+                return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
             };
         } else {
             fail("Uncovered type: " + requestType);
         }
 
-        testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+        testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
     }
 
     @SuppressWarnings("unused")
@@ -1851,10 +1893,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
                 .filter(RequestTypes::isSingleRowRw);
     }
 
-    private void testRwOperationsFailIfTableAlteredAfterTxStart(
+    private void testRwOperationFailsIfTableWasAlteredAfterTxStart(
             RequestType requestType,
             boolean onExistingRow,
-            ListenerInvocation listenerInvocation
+            RwListenerInvocation listenerInvocation
     ) {
         TestKey key = nextKey();
 
@@ -1865,13 +1907,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         UUID txId = newTxId();
         HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
 
-        CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
-        CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
-        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
-        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
-
-        when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1);
-        when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+        makeSchemaChangeAfter(txBeginTs);
 
         CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
 
@@ -1894,12 +1930,22 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         }
     }
 
+    private void makeSchemaChangeAfter(HybridTimestamp txBeginTs) {
+        CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+        CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
+        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+        when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+        when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1);
+        when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+    }
+
     @CartesianTest
     @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
     void multiRowRwOperationsFailIfTableAlteredAfterTxStart(
             RequestType requestType, boolean onExistingRow, boolean full
     ) {
-        ListenerInvocation invocation = null;
+        RwListenerInvocation invocation = null;
 
         if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
             invocation = (targetTxId, key) -> {
@@ -1913,7 +1959,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             fail("Uncovered type: " + requestType);
         }
 
-        testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+        testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
     }
 
     @SuppressWarnings("unused")
@@ -1933,7 +1979,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
             @Values(booleans = {false, true}) boolean onExistingRow,
             @Values(booleans = {false, true}) boolean full
     ) {
-        testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
+        testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
             return doReplaceRequest(
                     targetTxId,
                     marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
@@ -1944,15 +1990,232 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     @CartesianTest
-    void rwScanRequestFailsIfTableAlteredAfterTxStart(
+    void rwScanRequestFailsIfTableAlteredAfterTxStart(@Values(booleans = {false, true}) boolean onExistingRow) {
+        testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
+            return doRwScanRetrieveBatchRequest(targetTxId);
+        });
+    }
+
+    @Test
+    void rwScanCloseRequestSucceedsIfTableAlteredAfterTxStart() {
+        UUID txId = newTxId();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        makeSchemaChangeAfter(txBeginTs);
+
+        CompletableFuture<?> future = doRwScanCloseRequest(txId);
+
+        assertThat(future, willCompleteSuccessfully());
+    }
+
+    @CartesianTest
+    @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+    void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
+        RwListenerInvocation invocation = null;
+
+        if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
+            };
+        } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
+            };
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
+    }
+
+    private void testRwOperationFailsIfTableWasDropped(boolean onExistingRow, RwListenerInvocation listenerInvocation) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = newTxId();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        makeTableBeDroppedAfter(txBeginTs);
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+        IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+        assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
+    }
+
+    private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs) {
+        makeTableBeDroppedAfter(txBeginTs, TABLE_ID);
+    }
+
+    private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs, int tableId) {
+        CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+        when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+
+        when(catalogService.table(tableId, txBeginTs.longValue())).thenReturn(tableVersion1);
+        when(catalogService.table(eq(tableId), gt(txBeginTs.longValue()))).thenReturn(null);
+    }
+
+    @CartesianTest
+    @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+    void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
+        RwListenerInvocation invocation = null;
+
+        if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+            };
+        } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+            invocation = (targetTxId, key) -> {
+                return doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+            };
+        } else {
+            fail("Uncovered type: " + requestType);
+        }
+
+        testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
+    }
+
+    @CartesianTest
+    void replaceRequestFailsIfTableWasDropped(
             @Values(booleans = {false, true}) boolean onExistingRow,
             @Values(booleans = {false, true}) boolean full
     ) {
-        testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
-            return doRwFullScanRetrieveBatchRequest(targetTxId, full);
+        testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
+            return doReplaceRequest(
+                    targetTxId,
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+                    full
+            );
+        });
+    }
+
+    @CartesianTest
+    void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
+        testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
+            return doRwScanRetrieveBatchRequest(targetTxId);
+        });
+    }
+
+    @Test
+    void rwScanCloseRequestSucceedsIfTableWasDropped() {
+        UUID txId = newTxId();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        makeTableBeDroppedAfter(txBeginTs);
+
+        CompletableFuture<?> future = doRwScanCloseRequest(txId);
+
+        assertThat(future, willCompleteSuccessfully());
+    }
+
+    @CartesianTest
+    void singleRowRoGetFailsIfTableWasDropped(
+            @Values(booleans = {false, true}) boolean direct,
+            @Values(booleans = {false, true}) boolean onExistingRow
+    ) {
+        testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+            if (direct) {
+                return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller).tupleSlice());
+            } else {
+                return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller).tupleSlice(), readTimestamp);
+            }
+        });
+    }
+
+    private void testRoOperationFailsIfTableWasDropped(boolean onExistingRow, RoListenerInvocation listenerInvocation) {
+        TestKey key = nextKey();
+
+        if (onExistingRow) {
+            upsertInNewTxFor(key);
+        }
+
+        UUID txId = newTxId();
+        HybridTimestamp readTs = clock.now();
+
+        when(catalogService.table(eq(TABLE_ID), anyLong())).thenReturn(null);
+
+        CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key);
+
+        IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+        assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+        assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
+    }
+
+    @CartesianTest
+    void multiRowRoGetFailsIfTableWasDropped(
+            @Values(booleans = {false, true}) boolean direct,
+            @Values(booleans = {false, true}) boolean onExistingRow
+    ) {
+        testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+            if (direct) {
+                return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller)));
+            } else {
+                return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp);
+            }
         });
     }
 
+    @CartesianTest
+    void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
+        testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+            return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp);
+        });
+    }
+
+    @Test
+    void commitRequestFailsIfCommitPartitionTableWasDropped() {
+        testCommitRequestIfTableWasDropped(grpId, Set.of(grpId), grpId.tableId());
+    }
+
+    @Test
+    void commitRequestFailsIfNonCommitPartitionTableWasDropped() {
+        TablePartitionId anotherPartitionId = new TablePartitionId(ANOTHER_TABLE_ID, 0);
+
+        testCommitRequestIfTableWasDropped(grpId, Set.of(grpId, anotherPartitionId), anotherPartitionId.tableId());
+    }
+
+    private void testCommitRequestIfTableWasDropped(
+            TablePartitionId commitPartitionId,
+            Set<ReplicationGroupId> groups,
+            int tableToBeDroppedId
+    ) {
+        when(schemas.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
+                .thenReturn(List.of(
+                        tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col")))
+                ));
+        when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(completedFuture(null));
+
+        AtomicReference<Boolean> committed = interceptFinishTxCommand();
+
+        UUID txId = newTxId();
+        HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+        makeTableBeDroppedAfter(txBeginTs, tableToBeDroppedId);
+
+        CompletableFuture<?> future = partitionReplicaListener.invoke(
+                TX_MESSAGES_FACTORY.txFinishReplicaRequest()
+                        .groupId(commitPartitionId)
+                        .groups(groups)
+                        .txId(txId)
+                        .term(1L)
+                        .commit(true)
+                        .commitTimestampLong(clock.nowLong())
+                        .build(),
+                localNode.id()
+        );
+
+        IncompatibleSchemaAbortException ex = assertWillThrowFast(future, IncompatibleSchemaAbortException.class);
+        assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR));
+        assertThat(ex.getMessage(), is("Commit failed because a table was already dropped [tableId=" + tableToBeDroppedId + "]"));
+
+        assertThat("The transaction must have been aborted", committed.get(), is(false));
+    }
+
     private UUID newTxId() {
         return transactionIdFor(clock.now());
     }
@@ -1996,19 +2259,36 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         return (BinaryRow) future.join().result();
     }
 
-    private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, long readTimestamp) {
-        CompletableFuture<ReplicaResult> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
-                        .groupId(grpId)
-                        .requestType(RequestType.RO_GET_ALL)
-                        .readTimestampLong(readTimestamp)
-                        .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
-                        .build(),
-                localNode.id()
-        );
+    private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+        CompletableFuture<ReplicaResult> future = doReadOnlyMultiGet(rows, readTimestamp);
+
+        assertThat(future, willCompleteSuccessfully());
 
         return (List<BinaryRow>) future.join().result();
     }
 
+    private CompletableFuture<ReplicaResult> doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+        ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
+                .groupId(grpId)
+                .requestType(RequestType.RO_GET_ALL)
+                .readTimestampLong(readTimestamp.longValue())
+                .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+                .build();
+
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
+
+    private CompletableFuture<ReplicaResult> doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
+        ReadOnlyDirectMultiRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
+                .groupId(grpId)
+                .requestType(RequestType.RO_GET_ALL)
+                .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+                .enlistmentConsistencyToken(1L)
+                .build();
+
+        return partitionReplicaListener.invoke(request, localNode.id());
+    }
+
     private void cleanup(UUID txId) {
         HybridTimestamp commitTs = clock.now();
 
@@ -2046,11 +2326,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     private BinaryRow nextBinaryKey() {
-        try {
-            return kvMarshaller.marshal(nextKey());
-        } catch (MarshallerException e) {
-            throw new IgniteException(e);
-        }
+        return marshalQuietly(nextKey(), kvMarshaller);
     }
 
     private static TestKey nextKey() {
@@ -2085,14 +2361,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
         }
     }
 
-    private TestValue value(BinaryRow binaryRow) {
-        try {
-            return kvMarshaller.unmarshalValue(Row.wrapBinaryRow(schemaDescriptor, binaryRow));
-        } catch (MarshallerException e) {
-            throw new IgniteException(e);
-        }
-    }
-
     private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
         return TABLE_MESSAGES_FACTORY.binaryRowMessage()
                 .binaryTuple(binaryRow.tupleSlice())
@@ -2190,7 +2458,12 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     }
 
     @FunctionalInterface
-    private interface ListenerInvocation {
+    private interface RwListenerInvocation {
         CompletableFuture<?> invoke(UUID targetTxId, TestKey key);
     }
+
+    @FunctionalInterface
+    private interface RoListenerInvocation {
+        CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key);
+    }
 }
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 359e9da571..cd3fd04aa8 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.tx;
 
 
 import static java.lang.Math.abs;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static org.apache.ignite.internal.replicator.ReplicaManager.idleSafeTimePropagationPeriodMs;