You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ap...@apache.org on 2023/10/23 08:06:47 UTC
[ignite-3] branch main updated: IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)
This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5dc20b82c8 IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)
5dc20b82c8 is described below
commit 5dc20b82c8c0c09f66c4c04a98c74cece530d0ef
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Oct 23 12:06:41 2023 +0400
IGNITE-20042 Check table existence before executing each operation in an RW transaction (#2721)
---
.../ignite/internal/{Hacks.java => Kludges.java} | 4 +-
.../testframework/BaseIgniteAbstractTest.java | 2 +-
.../ignite/internal/replicator/ReplicaManager.java | 2 +-
.../internal/readonly/ItReadOnlyTxInPastTest.java | 2 +-
.../rebalance/ItRebalanceRecoveryTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../schemasync/ItSchemaSyncSingleNodeTest.java | 64 ++-
.../internal/table/distributed/TableManager.java | 2 +-
.../replicator/CompatValidationResult.java | 29 +-
.../replicator/PartitionReplicaListener.java | 148 ++++---
.../replicator/SchemaCompatValidator.java | 68 ++-
.../distributed/schema/NonHistoricSchemas.java | 7 +-
.../distributed/schema/SchemaSyncService.java | 1 +
.../replication/PartitionReplicaListenerTest.java | 461 ++++++++++++++++-----
.../apache/ignite/internal/tx/TxManagerTest.java | 2 +-
15 files changed, 614 insertions(+), 182 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
similarity index 90%
rename from modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
rename to modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
index b9cbdc1498..e1a546d69a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/Hacks.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/Kludges.java
@@ -18,9 +18,9 @@
package org.apache.ignite.internal;
/**
- * Contains hacks needed for the whole codebase. Should be removed as quickly as possible.
+ * Contains kludges needed for the whole codebase. Should be removed as quickly as possible.
*/
-public class Hacks {
+public class Kludges {
// TODO: Remove after IGNITE-20499 is fixed.
/** Name of the property overriding idle safe time propagation period (in milliseconds). */
public static final String IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY = "IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS";
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
index 98221daeed..7ab1133e35 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/BaseIgniteAbstractTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.testframework;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.lang.IgniteSystemProperties.IGNITE_SENSITIVE_DATA_LOGGING;
import static org.apache.ignite.internal.lang.IgniteSystemProperties.getString;
import static org.apache.ignite.internal.util.IgniteUtils.monotonicMs;
diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 91fb9481c6..c9ff648060 100644
--- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
index 4b1d651127..fdc00052c3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/readonly/ItReadOnlyTxInPastTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.readonly;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
index 20b290dc6a..33800e11db 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.rebalance;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 751baf57f3..2b61381301 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.runner.app;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
index 8df0537d46..a2ec535ba4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -78,9 +79,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
@ParameterizedTest
@EnumSource(Operation.class)
void readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation operation) {
- cluster.doInSession(0, session -> {
- executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
- });
+ createTable();
Table table = node.tables().table(TABLE_NAME);
@@ -101,7 +100,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
assertThat(
ex.getMessage(),
containsString(String.format(
- "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2",
+ "Table schema was updated after the transaction was started [table=%s, startSchema=1, operationSchema=2]",
tableId
))
);
@@ -121,6 +120,12 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
assertThat(tx.state(), is(TxState.ABORTED));
}
+ private void createTable() {
+ cluster.doInSession(0, session -> {
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+ });
+ }
+
private void alterTable(String tableName) {
cluster.doInSession(0, session -> {
executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session);
@@ -132,10 +137,10 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
}
private void enlistTableInTransaction(Table table, Transaction tx) {
- executeReadOn(table, tx, cluster);
+ executeRwReadOn(table, tx, cluster);
}
- private static void executeReadOn(Table table, Transaction tx, Cluster cluster) {
+ private static void executeRwReadOn(Table table, Transaction tx, Cluster cluster) {
cluster.doInSession(0, session -> {
executeUpdate("SELECT * FROM " + table.name(), session, tx);
});
@@ -180,7 +185,7 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
SQL_READ {
@Override
void execute(Table table, Transaction tx, Cluster cluster) {
- executeReadOn(table, tx, cluster);
+ executeRwReadOn(table, tx, cluster);
}
@Override
@@ -218,4 +223,49 @@ class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
assertDoesNotThrow(() -> putInTx(table, tx));
}
+
+ @ParameterizedTest
+ @EnumSource(Operation.class)
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20680")
+ void readWriteOperationAfterDroppingTargetTableIsRejected(Operation operation) {
+ createTable();
+
+ Table table = node.tables().table(TABLE_NAME);
+
+ putPreExistingValueTo(table);
+
+ InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+ enlistTableInTransaction(table, tx);
+
+ dropTable(TABLE_NAME);
+
+ IgniteException ex;
+
+ int tableId = ((TableImpl) table).tableId();
+
+ if (operation.sql()) {
+ ex = assertThrows(IgniteException.class, () -> operation.execute(table, tx, cluster));
+ assertThat(
+ ex.getMessage(),
+ containsString(String.format("Table was dropped [table=%s]", tableId))
+ );
+ } else {
+ ex = assertThrows(IncompatibleSchemaException.class, () -> operation.execute(table, tx, cluster));
+ assertThat(
+ ex.getMessage(),
+ is(String.format("Table was dropped [table=%s]", tableId))
+ );
+ }
+
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+
+ assertThat(tx.state(), is(TxState.ABORTED));
+ }
+
+ private void dropTable(String tableName) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("DROP TABLE " + tableName, session);
+ });
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 412c5df1c3..82aa20c5ba 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -950,7 +950,7 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent {
txStatePartitionStorage,
transactionStateResolver,
partitionUpdateHandlers.storageUpdateHandler,
- new NonHistoricSchemas(schemaManager),
+ new NonHistoricSchemas(schemaManager, schemaSyncService),
localNode(),
schemaSyncService,
catalogService,
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
index a6b2e54d45..dfb7e30b4a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CompatValidationResult.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table.distributed.replicator;
+import org.jetbrains.annotations.Nullable;
+
/**
* Result of a schema compatibility validation.
*/
@@ -26,7 +28,7 @@ public class CompatValidationResult {
private final boolean successful;
private final int failedTableId;
private final int fromSchemaVersion;
- private final int toSchemaVersion;
+ private final @Nullable Integer toSchemaVersion;
/**
* Returns a successful validation result.
@@ -38,18 +40,29 @@ public class CompatValidationResult {
}
/**
- * Creates a validation result denoting validation failure.
+ * Creates a validation result denoting incompatible schema change.
*
* @param failedTableId Table which schema change is incompatible.
* @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
* @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
* @return A validation result for a failure.
*/
- public static CompatValidationResult failure(int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+ public static CompatValidationResult incompatibleChange(int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
return new CompatValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
}
- private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+ /**
+ * Creates a validation result denoting 'table already dropped when commit is made' situation.
+ *
+ * @param failedTableId Table which schema change is incompatible.
+ * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+ * @return A validation result for a failure.
+ */
+ public static CompatValidationResult tableDropped(int failedTableId, int fromSchemaVersion) {
+ return new CompatValidationResult(false, failedTableId, fromSchemaVersion, null);
+ }
+
+ private CompatValidationResult(boolean successful, int failedTableId, int fromSchemaVersion, @Nullable Integer toSchemaVersion) {
this.successful = successful;
this.failedTableId = failedTableId;
this.fromSchemaVersion = fromSchemaVersion;
@@ -65,6 +78,13 @@ public class CompatValidationResult {
return successful;
}
+ /**
+ * Returns whether the validation has failed due to table was already dropped at the commit timestamp.
+ */
+ public boolean isTableDropped() {
+ return !successful && toSchemaVersion == null;
+ }
+
/**
* Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
* otherwise an exception is thrown.
@@ -95,6 +115,7 @@ public class CompatValidationResult {
*/
public int toSchemaVersion() {
assert !successful : "Should not be called on a successful result";
+ assert toSchemaVersion != null : "Should not be called when there is no toSchemaVersion";
return toSchemaVersion;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c7b238fcfa..4afcaa4dcf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -26,7 +26,6 @@ import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
-import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.tx.TxState.ABANDONED;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
@@ -466,9 +465,9 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
- CompletableFuture<Void> waitForSchemas = waitForSchemasBeforeReading(request);
-
- return waitForSchemas.thenCompose(unused -> processOperationRequest(request, isPrimary, senderId));
+ return waitForSchemasBeforeReading(request)
+ .thenCompose(unused -> validateTableExistence(request))
+ .thenCompose(opStartTimestamp -> processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
}
/**
@@ -493,7 +492,40 @@ public class PartitionReplicaListener implements ReplicaListener {
return tsToWaitForSchemas == null ? completedFuture(null) : schemaSyncService.waitForMetadataCompleteness(tsToWaitForSchemas);
}
- private CompletableFuture<?> processOperationRequest(ReplicaRequest request, @Nullable Boolean isPrimary, String senderId) {
+ private CompletableFuture<HybridTimestamp> validateTableExistence(ReplicaRequest request) {
+ HybridTimestamp opStartTs;
+
+ if (request instanceof ReadWriteScanCloseReplicaRequest) {
+ // We don't need to validate close request for table existence.
+ opStartTs = null;
+ } else if (request instanceof ReadWriteReplicaRequest) {
+ opStartTs = hybridClock.now();
+ } else if (request instanceof ReadOnlyReplicaRequest) {
+ opStartTs = ((ReadOnlyReplicaRequest) request).readTimestamp();
+ } else if (request instanceof ReadOnlyDirectReplicaRequest) {
+ opStartTs = hybridClock.now();
+ } else {
+ opStartTs = null;
+ }
+
+ if (opStartTs == null) {
+ return completedFuture(null);
+ }
+
+ return schemaSyncService.waitForMetadataCompleteness(opStartTs)
+ .thenApply(unused -> {
+ schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId());
+
+ return opStartTs;
+ });
+ }
+
+ private CompletableFuture<?> processOperationRequest(
+ ReplicaRequest request,
+ @Nullable Boolean isPrimary,
+ String senderId,
+ HybridTimestamp opStartTimestamp
+ ) {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
@@ -523,7 +555,7 @@ public class PartitionReplicaListener implements ReplicaListener {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
} else {
- return validateAtTimestamp(req.transactionId())
+ return validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
.thenApply(ignored -> rows);
}
})
@@ -557,9 +589,9 @@ public class PartitionReplicaListener implements ReplicaListener {
} else if (request instanceof BuildIndexReplicaRequest) {
return raftClient.run(toBuildIndexCommand((BuildIndexReplicaRequest) request));
} else if (request instanceof ReadOnlyDirectSingleRowReplicaRequest) {
- return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request);
+ return processReadOnlyDirectSingleEntryAction((ReadOnlyDirectSingleRowReplicaRequest) request, opStartTimestamp);
} else if (request instanceof ReadOnlyDirectMultiRowReplicaRequest) {
- return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request);
+ return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTimestamp);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -1289,10 +1321,16 @@ public class PartitionReplicaListener implements ReplicaListener {
if (request.commit()) {
HybridTimestamp commitTimestamp = request.commitTimestamp();
- return schemaCompatValidator.validateForward(txId, enlistedGroups, commitTimestamp)
- .thenCompose(validationResult ->
- finishAndCleanup(enlistedGroups, validationResult.isSuccessful(), commitTimestamp, txId, txCoordinatorId)
- .thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult)));
+ return schemaCompatValidator.validateCommit(txId, enlistedGroups, commitTimestamp)
+ .thenCompose(validationResult -> {
+ return finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ? commitTimestamp : null,
+ txId,
+ txCoordinatorId
+ ).thenAccept(unused -> throwIfSchemaValidationOnCommitFailed(validationResult));
+ });
} else {
// Aborting.
return finishAndCleanup(enlistedGroups, false, null, txId, txCoordinatorId);
@@ -1301,9 +1339,15 @@ public class PartitionReplicaListener implements ReplicaListener {
private static void throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult) {
if (!validationResult.isSuccessful()) {
- throw new IncompatibleSchemaAbortException("Commit failed because schema "
- + validationResult.fromSchemaVersion() + " is not forward-compatible with "
- + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
+ if (validationResult.isTableDropped()) {
+ throw new IncompatibleSchemaAbortException(
+ format("Commit failed because a table was already dropped [tableId={}]", validationResult.failedTableId())
+ );
+ } else {
+ throw new IncompatibleSchemaAbortException("Commit failed because schema "
+ + validationResult.fromSchemaVersion() + " is not forward-compatible with "
+ + validationResult.toSchemaVersion() + " for table " + validationResult.failedTableId());
+ }
}
}
@@ -1416,9 +1460,11 @@ public class PartitionReplicaListener implements ReplicaListener {
@Nullable HybridTimestamp commitTimestamp,
String txCoordinatorId
) {
- HybridTimestamp currentTimestamp = hybridClock.now();
+ assert !commit || (commitTimestamp != null);
- return reliableCatalogVersionFor(currentTimestamp)
+ HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp : hybridClock.now();
+
+ return reliableCatalogVersionFor(tsForCatalogVersion)
.thenCompose(catalogVersion -> {
synchronized (commandProcessingLinearizationMutex) {
FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
@@ -1507,9 +1553,9 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return allOffFuturesExceptionIgnored(txUpdateFutures, request).thenCompose(v -> {
- long commandTimestamp = hybridClock.nowLong();
+ HybridTimestamp commandTimestamp = hybridClock.now();
- return reliableCatalogVersionFor(hybridTimestamp(commandTimestamp))
+ return reliableCatalogVersionFor(commandTimestamp)
.thenCompose(catalogVersion -> {
synchronized (commandProcessingLinearizationMutex) {
TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
@@ -1811,13 +1857,14 @@ public class PartitionReplicaListener implements ReplicaListener {
* Processes multiple entries direct request for read only transaction.
*
* @param request Read only multiple entries request.
+ * @param opStartTimestamp Moment when the operation processing was started in this class.
* @return Result future.
*/
private CompletableFuture<List<BinaryRow>> processReadOnlyDirectMultiEntryAction(
- ReadOnlyDirectMultiRowReplicaRequest request
- ) {
+ ReadOnlyDirectMultiRowReplicaRequest request,
+ HybridTimestamp opStartTimestamp) {
List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
- HybridTimestamp readTimestamp = hybridClock.now();
+ HybridTimestamp readTimestamp = opStartTimestamp;
if (request.requestType() != RequestType.RO_GET_ALL) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -1905,7 +1952,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new ReplicaResult(result, null));
}
- return validateOperationAgainstSchema(request.transactionId())
+ return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
@@ -1975,7 +2022,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return allOf(insertLockFuts)
.thenCompose(ignored ->
// We are inserting completely new rows - no need to cleanup anything in this case, hence empty times.
- validateOperationAgainstSchema(request.transactionId())
+ validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
)
.thenCompose(catalogVersion -> applyUpdateAllCommand(
request,
@@ -2036,7 +2083,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new ReplicaResult(null, null));
}
- return validateOperationAgainstSchema(request.transactionId())
+ return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
@@ -2107,7 +2154,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(result);
}
- return validateAtTimestamp(txId)
+ return validateRwReadAgainstSchemaAfterTakingLocks(txId)
.thenApply(unused -> new ReplicaResult(result, null));
});
}
@@ -2154,7 +2201,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new ReplicaResult(result, null));
}
- return validateOperationAgainstSchema(request.transactionId())
+ return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rows, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateAllCommand(
@@ -2470,11 +2517,15 @@ public class PartitionReplicaListener implements ReplicaListener {
* Processes single entry direct request for read only transaction.
*
* @param request Read only single entry request.
+ * @param opStartTimestamp Moment when the operation processing was started in this class.
* @return Result future.
*/
- private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(ReadOnlyDirectSingleRowReplicaRequest request) {
+ private CompletableFuture<BinaryRow> processReadOnlyDirectSingleEntryAction(
+ ReadOnlyDirectSingleRowReplicaRequest request,
+ HybridTimestamp opStartTimestamp
+ ) {
BinaryTuple primaryKey = resolvePk(request.primaryKey());
- HybridTimestamp readTimestamp = hybridClock.now();
+ HybridTimestamp readTimestamp = opStartTimestamp;
if (request.requestType() != RequestType.RO_GET) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
@@ -2511,7 +2562,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new ReplicaResult(false, request.full() ? null : completedFuture(null)));
}
- return validateOperationAgainstSchema(request.transactionId())
+ return validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(validatedRowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2536,7 +2587,7 @@ public class PartitionReplicaListener implements ReplicaListener {
RowId rowId0 = new RowId(partId(), UUID.randomUUID());
return takeLocksForInsert(searchRow, rowId0, txId)
- .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(
catalogVersion -> applyUpdateCommand(
request,
@@ -2567,7 +2618,7 @@ public class PartitionReplicaListener implements ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
- .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2599,7 +2650,7 @@ public class PartitionReplicaListener implements ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
- .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2627,7 +2678,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2655,7 +2706,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowIdLock -> validateOperationAgainstSchema(request.transactionId())
+ .thenCompose(rowIdLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId())
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2706,7 +2757,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForGet(rowId, txId)
- .thenCompose(ignored -> validateAtTimestamp(txId))
+ .thenCompose(ignored -> validateRwReadAgainstSchemaAfterTakingLocks(txId))
.thenApply(ignored -> new ReplicaResult(row, null));
});
}
@@ -2717,7 +2768,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(rowLock -> validateOperationAgainstSchema(request.transactionId()))
+ .thenCompose(rowLock -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2741,7 +2792,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(ignored -> validateOperationAgainstSchema(request.transactionId()))
+ .thenCompose(ignored -> validateWriteAgainstSchemaAfterTakingLocks(request.transactionId()))
.thenCompose(catalogVersion -> awaitCleanup(rowId, catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -2985,7 +3036,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(new ReplicaResult(false, null));
}
- return validateOperationAgainstSchema(txId)
+ return validateWriteAgainstSchemaAfterTakingLocks(txId)
.thenCompose(catalogVersion -> awaitCleanup(rowIdLock.get1(), catalogVersion))
.thenCompose(
catalogVersion -> applyUpdateCommand(
@@ -3259,25 +3310,26 @@ public class PartitionReplicaListener implements ReplicaListener {
return false;
}
- private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
+ /**
+ * Takes current timestamp and makes schema related validations at this timestamp.
+ *
+ * @param txId Transaction ID.
+ * @return Future that will complete when validation completes.
+ */
+ private CompletableFuture<Void> validateRwReadAgainstSchemaAfterTakingLocks(UUID txId) {
HybridTimestamp operationTimestamp = hybridClock.now();
return schemaSyncService.waitForMetadataCompleteness(operationTimestamp)
- .thenApply(unused -> {
- failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
-
- return null;
- });
+ .thenRun(() -> failIfSchemaChangedSinceTxStart(txId, operationTimestamp));
}
/**
- * Chooses operation timestamp and makes schema related validations. The operation timestamp is only used for validation,
- * it is NOT sent as safeTime timestamp.
+ * Takes current timestamp and makes schema related validations at this timestamp.
*
* @param txId Transaction ID.
* @return Future that will complete with catalog version associated with given operation though the operation timestamp.
*/
- private CompletableFuture<Integer> validateOperationAgainstSchema(UUID txId) {
+ private CompletableFuture<Integer> validateWriteAgainstSchemaAfterTakingLocks(UUID txId) {
HybridTimestamp operationTimestamp = hybridClock.now();
return reliableCatalogVersionFor(operationTimestamp)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 9b37043388..90ca7d5d55 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
import org.apache.ignite.internal.table.distributed.schema.Schemas;
import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
import org.apache.ignite.internal.tx.TransactionIds;
-import org.jetbrains.annotations.Nullable;
/**
* Validates schema compatibility.
@@ -48,19 +47,19 @@ class SchemaCompatValidator {
}
/**
- * Performs commit forward compatibility validation. That is, for each table enlisted in the transaction, checks to see whether the
- * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit
- * timestamp).
+ * Performs commit validation. That is, checks that each table enlisted in the tranasction still exists at the commit timestamp,
+ * and that the initial schema of the table (identified by the begin timestamp) is forward-compatible with the commit schema
+ * (identified by the commit timestamp).
*
* @param txId ID of the transaction that gets validated.
* @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
- * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+ * @param commitTimestamp Commit timestamp.
* @return Future of validation result.
*/
- CompletableFuture<CompatValidationResult> validateForward(
+ CompletableFuture<CompatValidationResult> validateCommit(
UUID txId,
Collection<TablePartitionId> enlistedGroupIds,
- @Nullable HybridTimestamp commitTimestamp
+ HybridTimestamp commitTimestamp
) {
HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
@@ -68,23 +67,18 @@ class SchemaCompatValidator {
.map(TablePartitionId::tableId)
.collect(toSet());
- assert commitTimestamp != null;
// Using compareTo() instead of after()/begin() because the latter methods take clock skew into account
// which only makes sense when comparing 'unrelated' timestamps. beginTs and commitTs have a causal relationship,
// so we don't need to account for clock skew.
assert commitTimestamp.compareTo(beginTimestamp) > 0;
return schemas.waitForSchemasAvailability(commitTimestamp)
- .thenApply(ignored -> validateForwardSchemasCompatibility(tableIds, commitTimestamp, beginTimestamp));
+ .thenApply(ignored -> validateCommit(tableIds, commitTimestamp, beginTimestamp));
}
- private CompatValidationResult validateForwardSchemasCompatibility(
- Set<Integer> tableIds,
- HybridTimestamp commitTimestamp,
- HybridTimestamp beginTimestamp
- ) {
+ private CompatValidationResult validateCommit(Set<Integer> tableIds, HybridTimestamp commitTimestamp, HybridTimestamp beginTimestamp) {
for (int tableId : tableIds) {
- CompatValidationResult validationResult = validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId);
+ CompatValidationResult validationResult = validateCommit(beginTimestamp, commitTimestamp, tableId);
if (!validationResult.isSuccessful()) {
return validationResult;
@@ -94,6 +88,29 @@ class SchemaCompatValidator {
return CompatValidationResult.success();
}
+ private CompatValidationResult validateCommit(HybridTimestamp beginTimestamp, HybridTimestamp commitTimestamp, int tableId) {
+ CatalogTableDescriptor tableAtCommitTs = catalogService.table(tableId, commitTimestamp.longValue());
+
+ if (tableAtCommitTs == null) {
+ CatalogTableDescriptor tableAtTxStart = catalogService.table(tableId, beginTimestamp.longValue());
+ assert tableAtTxStart != null : "No table " + tableId + " at ts " + beginTimestamp;
+
+ return CompatValidationResult.tableDropped(tableId, tableAtTxStart.schemaId());
+ }
+
+ return validateForwardSchemaCompatibility(beginTimestamp, commitTimestamp, tableId);
+ }
+
+ /**
+ * Performs forward compatibility validation. That is, for the given table, checks to see whether the
+ * initial schema (identified by the begin timestamp) is forward-compatible with the commit schema (identified by the commit
+ * timestamp).
+ *
+ * @param beginTimestamp Begin timestamp of a transaction.
+ * @param commitTimestamp Commit timestamp.
+ * @param tableId ID of the table that is under validation.
+ * @return Validation result.
+ */
private CompatValidationResult validateForwardSchemaCompatibility(
HybridTimestamp beginTimestamp,
HybridTimestamp commitTimestamp,
@@ -107,7 +124,7 @@ class SchemaCompatValidator {
FullTableSchema oldSchema = tableSchemas.get(i);
FullTableSchema newSchema = tableSchemas.get(i + 1);
if (!isForwardCompatible(oldSchema, newSchema)) {
- return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
+ return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
}
}
@@ -160,7 +177,7 @@ class SchemaCompatValidator {
FullTableSchema oldSchema = tableSchemas.get(i);
FullTableSchema newSchema = tableSchemas.get(i + 1);
if (!isBackwardCompatible(oldSchema, newSchema)) {
- return CompatValidationResult.failure(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
+ return CompatValidationResult.incompatibleChange(tableId, oldSchema.schemaVersion(), newSchema.schemaVersion());
}
}
@@ -178,7 +195,10 @@ class SchemaCompatValidator {
CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue());
assert tableAtBeginTs != null;
- assert tableAtOpTs != null;
+
+ if (tableAtOpTs == null) {
+ throw tableWasDroppedException(tableId);
+ }
if (tableAtOpTs.tableVersion() != tableAtBeginTs.tableVersion()) {
throw new IncompatibleSchemaException(
@@ -189,4 +209,16 @@ class SchemaCompatValidator {
);
}
}
+
+ private static IncompatibleSchemaException tableWasDroppedException(int tableId) {
+ return new IncompatibleSchemaException(String.format("Table was dropped [table=%d]", tableId));
+ }
+
+ void failIfTableDoesNotExistAt(HybridTimestamp operationTimestamp, int tableId) {
+ CatalogTableDescriptor tableAtOpTs = catalogService.table(tableId, operationTimestamp.longValue());
+
+ if (tableAtOpTs == null) {
+ throw tableWasDroppedException(tableId);
+ }
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
index 83558e0f74..c6eb5f38c8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java
@@ -52,13 +52,16 @@ import org.apache.ignite.internal.type.VarlenNativeType;
public class NonHistoricSchemas implements Schemas {
private final SchemaManager schemaManager;
- public NonHistoricSchemas(SchemaManager schemaManager) {
+ private final SchemaSyncService schemaSyncService;
+
+ public NonHistoricSchemas(SchemaManager schemaManager, SchemaSyncService schemaSyncService) {
this.schemaManager = schemaManager;
+ this.schemaSyncService = schemaSyncService;
}
@Override
public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
- return completedFuture(null);
+ return schemaSyncService.waitForMetadataCompleteness(ts);
}
@Override
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
index aaaf4f47c0..b062860842 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncService.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
* Implements Schema Synchronization wait logic as defined in IEP-98.
*/
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface SchemaSyncService {
/**
* Waits till metadata (like table/index schemas) is complete for the given timestamp. The 'complete' here means
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 68b824d48a..eedada14c4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaResult;
import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
@@ -134,6 +135,10 @@ import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyDirectSingleRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowPkReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowPkReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
@@ -166,7 +171,6 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Transactions;
-import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.MessagingService;
@@ -212,6 +216,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final int TABLE_ID = 1;
+ private static final int ANOTHER_TABLE_ID = 2;
+
private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>();
/** The storage stores partition data. */
@@ -576,18 +582,41 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
public void testReadOnlySingleRowReplicaRequestEmptyResult() throws Exception {
BinaryRow testBinaryKey = nextBinaryKey();
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(grpId)
- .readTimestampLong(clock.nowLong())
- .primaryKey(testBinaryKey.tupleSlice())
- .requestType(RequestType.RO_GET)
- .build(), localNode.id());
+ ByteBuffer pk = testBinaryKey.tupleSlice();
+
+ CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(pk);
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
assertNull(binaryRow);
}
+ private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk) {
+ return doReadOnlySingleGet(pk, clock.now());
+ }
+
+ private CompletableFuture<ReplicaResult> doReadOnlySingleGet(ByteBuffer pk, HybridTimestamp readTimestamp) {
+ ReadOnlySingleRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
+ .groupId(grpId)
+ .readTimestampLong(readTimestamp.longValue())
+ .primaryKey(pk)
+ .requestType(RequestType.RO_GET)
+ .build();
+
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
+
+ private CompletableFuture<ReplicaResult> doReadOnlyDirectSingleGet(ByteBuffer pk) {
+ ReadOnlyDirectSingleRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest()
+ .groupId(grpId)
+ .primaryKey(pk)
+ .requestType(RequestType.RO_GET)
+ .enlistmentConsistencyToken(1L)
+ .build();
+
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
+
@Test
public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception {
UUID txId = newTxId();
@@ -599,12 +628,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
testMvPartitionStorage.commitWrite(rowId, clock.now());
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(grpId)
- .readTimestampLong(clock.nowLong())
- .primaryKey(testBinaryKey.tupleSlice())
- .requestType(RequestType.RO_GET)
- .build(), localNode.id());
+ CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
@@ -623,12 +647,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.COMMITED, localNode.id(), clock.now()));
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(grpId)
- .readTimestampLong(clock.nowLong())
- .primaryKey(testBinaryKey.tupleSlice())
- .requestType(RequestType.RO_GET)
- .build(), localNode.id());
+ CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
@@ -646,12 +665,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.PENDING, localNode.id(), null));
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(grpId)
- .readTimestampLong(clock.nowLong())
- .primaryKey(testBinaryKey.tupleSlice())
- .requestType(RequestType.RO_GET)
- .build(), localNode.id());
+ CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
@@ -670,12 +684,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID, PART_ID);
txManager.updateTxMeta(txId, old -> new TxStateMeta(TxState.ABORTED, localNode.id(), null));
- CompletableFuture<ReplicaResult> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
- .groupId(grpId)
- .readTimestampLong(clock.nowLong())
- .primaryKey(testBinaryKey.tupleSlice())
- .requestType(RequestType.RO_GET)
- .build(), localNode.id());
+ CompletableFuture<ReplicaResult> fut = doReadOnlySingleGet(testBinaryKey.tupleSlice());
BinaryRow binaryRow = (BinaryRow) fut.get(1, TimeUnit.SECONDS).result();
@@ -981,11 +990,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
@Test
- public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() throws MarshallerException {
+ public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() {
UUID txId = newTxId();
BinaryRow testRow = binaryRow(0);
- BinaryRow testRowPk = kvMarshaller.marshal(new TestKey(0, "k0"));
+ BinaryRow testRowPk = marshalQuietly(new TestKey(0, "k0"), kvMarshaller);
assertThat(doSingleRowRequest(txId, testRow, RequestType.RW_INSERT), willCompleteSuccessfully());
@@ -1032,8 +1041,16 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
cleanup(txId);
}
+ private static <K, V> Row marshalQuietly(K key, KvMarshaller<K, V> marshaller) {
+ try {
+ return marshaller.marshal(key);
+ } catch (MarshallerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Test
- public void testWriteIntentOnPrimaryReplicaMultiRowOps() throws MarshallerException {
+ public void testWriteIntentOnPrimaryReplicaMultiRowOps() {
UUID txId = newTxId();
BinaryRow row0 = binaryRow(0);
BinaryRow row1 = binaryRow(1);
@@ -1056,8 +1073,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
checkRowInMvStorage(newRow1, true);
Collection<BinaryRow> newRowPks = List.of(
- kvMarshaller.marshal(new TestKey(0, "k0")),
- kvMarshaller.marshal(new TestKey(1, "k1"))
+ marshalQuietly(new TestKey(0, "k0"), kvMarshaller),
+ marshalQuietly(new TestKey(1, "k1"), kvMarshaller)
);
assertThat(doMultiRowPkRequest(txId, newRowPks, RequestType.RW_DELETE_ALL), willCompleteSuccessfully());
@@ -1099,6 +1116,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
+ return doSingleRowPkRequest(txId, binaryRow, requestType, false);
+ }
+
+ private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest()
.groupId(grpId)
.transactionId(txId)
@@ -1106,6 +1127,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.primaryKey(binaryRow.tupleSlice())
.term(1L)
.commitPartitionId(commitPartitionId())
+ .full(full)
.build(),
localNode.id()
);
@@ -1306,14 +1328,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Values(booleans = {false, true}) boolean upsertAfterDelete,
@Values(booleans = {false, true}) boolean committed,
@Values(booleans = {false, true}) boolean multiple
- ) throws MarshallerException {
+ ) {
BinaryRow br1 = binaryRow(1);
- BinaryRow br1Pk = kvMarshaller.marshal(new TestKey(1, "k" + 1));
+ BinaryRow br1Pk = marshalQuietly(new TestKey(1, "k" + 1), kvMarshaller);
BinaryRow br2 = binaryRow(2);
- BinaryRow br2Pk = kvMarshaller.marshal(new TestKey(2, "k" + 2));
+ BinaryRow br2Pk = marshalQuietly(new TestKey(2, "k" + 2), kvMarshaller);
if (insertFirst) {
UUID tx0 = newTxId();
@@ -1369,7 +1391,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
List<BinaryRow> expected = committed
? (upsertAfterDelete ? allRows : allRowsButModified)
: (insertFirst ? allRows : singletonList((BinaryRow) null));
- List<BinaryRow> res = roGetAll(allRowsPks, clock.nowLong());
+ List<BinaryRow> res = roGetAll(allRowsPks, clock.now());
assertEquals(allRows.size(), res.size());
@@ -1556,13 +1578,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private BinaryRow marshalKeyOrKeyValue(RequestType requestType, TestKey key) {
try {
- return RequestTypes.isKeyOnly(requestType) ? kvMarshaller.marshal(key) : kvMarshaller.marshal(key, someValue);
+ return RequestTypes.isKeyOnly(requestType) ? marshalQuietly(key, kvMarshaller) : kvMarshaller.marshal(key, someValue);
} catch (MarshallerException e) {
throw new AssertionError(e);
}
}
- private void testFailsWhenReadingFromFutureIncompatibleSchema(ListenerInvocation listenerInvocation) {
+ private void testFailsWhenReadingFromFutureIncompatibleSchema(RwListenerInvocation listenerInvocation) {
UUID targetTxId = transactionIdFor(clock.now());
TestKey key = simulateWriteWithSchemaVersionFromFuture();
@@ -1700,13 +1722,13 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
@Test
- public void failsWhenFullScanReadsTupleWithIncompatibleSchemaFromFuture() {
+ public void failsWhenScanReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
- (targetTxId, key) -> doRwFullScanRetrieveBatchRequest(targetTxId, false)
+ (targetTxId, key) -> doRwScanRetrieveBatchRequest(targetTxId)
);
}
- private CompletableFuture<?> doRwFullScanRetrieveBatchRequest(UUID targetTxId, boolean full) {
+ private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) {
return partitionReplicaListener.invoke(
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
@@ -1714,7 +1736,32 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.term(1L)
.scanId(1)
.batchSize(100)
- .full(full)
+ .full(false)
+ .build(),
+ localNode.id()
+ );
+ }
+
+ private CompletableFuture<?> doRwScanCloseRequest(UUID targetTxId) {
+ return partitionReplicaListener.invoke(
+ TABLE_MESSAGES_FACTORY.readWriteScanCloseReplicaRequest()
+ .groupId(grpId)
+ .transactionId(targetTxId)
+ .term(1L)
+ .scanId(1)
+ .build(),
+ localNode.id()
+ );
+ }
+
+ private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, HybridTimestamp readTimestamp) {
+ return partitionReplicaListener.invoke(
+ TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
+ .groupId(grpId)
+ .transactionId(targetTxId)
+ .scanId(1)
+ .batchSize(100)
+ .readTimestampLong(readTimestamp.longValue())
.build(),
localNode.id()
);
@@ -1742,7 +1789,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.map(Arguments::of);
}
- private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, ListenerInvocation listenerInvocation) {
+ private void testWritesAreSuppliedWithRequiredCatalogVersion(RequestType requestType, RwListenerInvocation listenerInvocation) {
TestKey key = nextKey();
if (RequestTypes.looksUpFirst(requestType)) {
@@ -1755,16 +1802,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42);
UUID targetTxId = newTxId();
- HybridTimestamp beginTs = TransactionIds.beginTimestamp(targetTxId);
CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
assertThat(future, willCompleteSuccessfully());
- // Make sure metadata completeness is awaited for (at operation timestamp, that is, later than beginTs).
- //noinspection ConstantConditions
- verify(schemaSyncService).waitForMetadataCompleteness(gt(beginTs));
-
// Make sure catalog required version is filled in the executed update command.
verify(mockRaftClient, atLeast(1)).run(commandCaptor.capture());
@@ -1822,21 +1864,21 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
boolean onExistingRow,
boolean full
) {
- ListenerInvocation invocation = null;
+ RwListenerInvocation invocation = null;
if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
invocation = (targetTxId, key) -> {
- return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+ return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
};
} else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
invocation = (targetTxId, key) -> {
- return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+ return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
};
} else {
fail("Uncovered type: " + requestType);
}
- testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+ testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
}
@SuppressWarnings("unused")
@@ -1851,10 +1893,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.filter(RequestTypes::isSingleRowRw);
}
- private void testRwOperationsFailIfTableAlteredAfterTxStart(
+ private void testRwOperationFailsIfTableWasAlteredAfterTxStart(
RequestType requestType,
boolean onExistingRow,
- ListenerInvocation listenerInvocation
+ RwListenerInvocation listenerInvocation
) {
TestKey key = nextKey();
@@ -1865,13 +1907,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
UUID txId = newTxId();
HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
- CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
- CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
- when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
- when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
-
- when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1);
- when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+ makeSchemaChangeAfter(txBeginTs);
CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
@@ -1894,12 +1930,22 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
}
+ private void makeSchemaChangeAfter(HybridTimestamp txBeginTs) {
+ CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+ CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
+ when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+ when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+ when(catalogService.table(TABLE_ID, txBeginTs.longValue())).thenReturn(tableVersion1);
+ when(catalogService.table(eq(TABLE_ID), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+ }
+
@CartesianTest
@CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
void multiRowRwOperationsFailIfTableAlteredAfterTxStart(
RequestType requestType, boolean onExistingRow, boolean full
) {
- ListenerInvocation invocation = null;
+ RwListenerInvocation invocation = null;
if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
invocation = (targetTxId, key) -> {
@@ -1913,7 +1959,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
fail("Uncovered type: " + requestType);
}
- testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+ testRwOperationFailsIfTableWasAlteredAfterTxStart(requestType, onExistingRow, invocation);
}
@SuppressWarnings("unused")
@@ -1933,7 +1979,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Values(booleans = {false, true}) boolean onExistingRow,
@Values(booleans = {false, true}) boolean full
) {
- testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
+ testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
return doReplaceRequest(
targetTxId,
marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
@@ -1944,15 +1990,232 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
@CartesianTest
- void rwScanRequestFailsIfTableAlteredAfterTxStart(
+ void rwScanRequestFailsIfTableAlteredAfterTxStart(@Values(booleans = {false, true}) boolean onExistingRow) {
+ testRwOperationFailsIfTableWasAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
+ return doRwScanRetrieveBatchRequest(targetTxId);
+ });
+ }
+
+ @Test
+ void rwScanCloseRequestSucceedsIfTableAlteredAfterTxStart() {
+ UUID txId = newTxId();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ makeSchemaChangeAfter(txBeginTs);
+
+ CompletableFuture<?> future = doRwScanCloseRequest(txId);
+
+ assertThat(future, willCompleteSuccessfully());
+ }
+
+ @CartesianTest
+ @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+ void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
+ RwListenerInvocation invocation = null;
+
+ if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
+ };
+ } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full);
+ };
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
+ }
+
+ private void testRwOperationFailsIfTableWasDropped(boolean onExistingRow, RwListenerInvocation listenerInvocation) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = newTxId();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ makeTableBeDroppedAfter(txBeginTs);
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+ IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+ assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
+ }
+
+ private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs) {
+ makeTableBeDroppedAfter(txBeginTs, TABLE_ID);
+ }
+
+ private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs, int tableId) {
+ CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+ when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+
+ when(catalogService.table(tableId, txBeginTs.longValue())).thenReturn(tableVersion1);
+ when(catalogService.table(eq(tableId), gt(txBeginTs.longValue()))).thenReturn(null);
+ }
+
+ @CartesianTest
+ @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+ void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) {
+ RwListenerInvocation invocation = null;
+
+ if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ };
+ } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ };
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationFailsIfTableWasDropped(onExistingRow, invocation);
+ }
+
+ @CartesianTest
+ void replaceRequestFailsIfTableWasDropped(
@Values(booleans = {false, true}) boolean onExistingRow,
@Values(booleans = {false, true}) boolean full
) {
- testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
- return doRwFullScanRetrieveBatchRequest(targetTxId, full);
+ testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
+ return doReplaceRequest(
+ targetTxId,
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ full
+ );
+ });
+ }
+
+ @CartesianTest
+ void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
+ testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> {
+ return doRwScanRetrieveBatchRequest(targetTxId);
+ });
+ }
+
+ @Test
+ void rwScanCloseRequestSucceedsIfTableWasDropped() {
+ UUID txId = newTxId();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ makeTableBeDroppedAfter(txBeginTs);
+
+ CompletableFuture<?> future = doRwScanCloseRequest(txId);
+
+ assertThat(future, willCompleteSuccessfully());
+ }
+
+ @CartesianTest
+ void singleRowRoGetFailsIfTableWasDropped(
+ @Values(booleans = {false, true}) boolean direct,
+ @Values(booleans = {false, true}) boolean onExistingRow
+ ) {
+ testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+ if (direct) {
+ return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller).tupleSlice());
+ } else {
+ return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller).tupleSlice(), readTimestamp);
+ }
+ });
+ }
+
+ private void testRoOperationFailsIfTableWasDropped(boolean onExistingRow, RoListenerInvocation listenerInvocation) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = newTxId();
+ HybridTimestamp readTs = clock.now();
+
+ when(catalogService.table(eq(TABLE_ID), anyLong())).thenReturn(null);
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key);
+
+ IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+ assertThat(ex.getMessage(), is("Table was dropped [table=1]"));
+ }
+
+ @CartesianTest
+ void multiRowRoGetFailsIfTableWasDropped(
+ @Values(booleans = {false, true}) boolean direct,
+ @Values(booleans = {false, true}) boolean onExistingRow
+ ) {
+ testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+ if (direct) {
+ return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller)));
+ } else {
+ return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp);
+ }
});
}
+ @CartesianTest
+ void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) {
+ testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> {
+ return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp);
+ });
+ }
+
+ @Test
+ void commitRequestFailsIfCommitPartitionTableWasDropped() {
+ testCommitRequestIfTableWasDropped(grpId, Set.of(grpId), grpId.tableId());
+ }
+
+ @Test
+ void commitRequestFailsIfNonCommitPartitionTableWasDropped() {
+ TablePartitionId anotherPartitionId = new TablePartitionId(ANOTHER_TABLE_ID, 0);
+
+ testCommitRequestIfTableWasDropped(grpId, Set.of(grpId, anotherPartitionId), anotherPartitionId.tableId());
+ }
+
+ private void testCommitRequestIfTableWasDropped(
+ TablePartitionId commitPartitionId,
+ Set<ReplicationGroupId> groups,
+ int tableToBeDroppedId
+ ) {
+ when(schemas.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class)))
+ .thenReturn(List.of(
+ tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col")))
+ ));
+ when(txManager.cleanup(any(), any(), any(), anyBoolean(), any())).thenReturn(completedFuture(null));
+
+ AtomicReference<Boolean> committed = interceptFinishTxCommand();
+
+ UUID txId = newTxId();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ makeTableBeDroppedAfter(txBeginTs, tableToBeDroppedId);
+
+ CompletableFuture<?> future = partitionReplicaListener.invoke(
+ TX_MESSAGES_FACTORY.txFinishReplicaRequest()
+ .groupId(commitPartitionId)
+ .groups(groups)
+ .txId(txId)
+ .term(1L)
+ .commit(true)
+ .commitTimestampLong(clock.nowLong())
+ .build(),
+ localNode.id()
+ );
+
+ IncompatibleSchemaAbortException ex = assertWillThrowFast(future, IncompatibleSchemaAbortException.class);
+ assertThat(ex.code(), is(Transactions.TX_COMMIT_ERR));
+ assertThat(ex.getMessage(), is("Commit failed because a table was already dropped [tableId=" + tableToBeDroppedId + "]"));
+
+ assertThat("The transaction must have been aborted", committed.get(), is(false));
+ }
+
private UUID newTxId() {
return transactionIdFor(clock.now());
}
@@ -1996,19 +2259,36 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
return (BinaryRow) future.join().result();
}
- private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, long readTimestamp) {
- CompletableFuture<ReplicaResult> future = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
- .groupId(grpId)
- .requestType(RequestType.RO_GET_ALL)
- .readTimestampLong(readTimestamp)
- .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
- .build(),
- localNode.id()
- );
+ private List<BinaryRow> roGetAll(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+ CompletableFuture<ReplicaResult> future = doReadOnlyMultiGet(rows, readTimestamp);
+
+ assertThat(future, willCompleteSuccessfully());
return (List<BinaryRow>) future.join().result();
}
+ private CompletableFuture<ReplicaResult> doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) {
+ ReadOnlyMultiRowPkReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest()
+ .groupId(grpId)
+ .requestType(RequestType.RO_GET_ALL)
+ .readTimestampLong(readTimestamp.longValue())
+ .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+ .build();
+
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
+
+ private CompletableFuture<ReplicaResult> doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) {
+ ReadOnlyDirectMultiRowReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest()
+ .groupId(grpId)
+ .requestType(RequestType.RO_GET_ALL)
+ .primaryKeys(rows.stream().map(BinaryRow::tupleSlice).collect(toList()))
+ .enlistmentConsistencyToken(1L)
+ .build();
+
+ return partitionReplicaListener.invoke(request, localNode.id());
+ }
+
private void cleanup(UUID txId) {
HybridTimestamp commitTs = clock.now();
@@ -2046,11 +2326,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private BinaryRow nextBinaryKey() {
- try {
- return kvMarshaller.marshal(nextKey());
- } catch (MarshallerException e) {
- throw new IgniteException(e);
- }
+ return marshalQuietly(nextKey(), kvMarshaller);
}
private static TestKey nextKey() {
@@ -2085,14 +2361,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
}
- private TestValue value(BinaryRow binaryRow) {
- try {
- return kvMarshaller.unmarshalValue(Row.wrapBinaryRow(schemaDescriptor, binaryRow));
- } catch (MarshallerException e) {
- throw new IgniteException(e);
- }
- }
-
private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
return TABLE_MESSAGES_FACTORY.binaryRowMessage()
.binaryTuple(binaryRow.tupleSlice())
@@ -2190,7 +2458,12 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
@FunctionalInterface
- private interface ListenerInvocation {
+ private interface RwListenerInvocation {
CompletableFuture<?> invoke(UUID targetTxId, TestKey key);
}
+
+ @FunctionalInterface
+ private interface RoListenerInvocation {
+ CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key);
+ }
}
diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 359e9da571..cd3fd04aa8 100644
--- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.tx;
import static java.lang.Math.abs;
-import static org.apache.ignite.internal.Hacks.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.Kludges.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.replicator.ReplicaManager.idleSafeTimePropagationPeriodMs;