You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/09/13 06:56:34 UTC
[ignite-3] branch main updated: IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0d1ae2d1d6 IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)
0d1ae2d1d6 is described below
commit 0d1ae2d1d67724f43abfaa8af4a2f85b9e328656
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Sep 13 10:56:27 2023 +0400
IGNITE-20044 Validate schema eligibility on each read/write operation in an RW transaction (#2566)
---
.../Table/SchemaSynchronizationTest.cs | 1 +
.../org/apache/ignite/internal/SessionUtils.java | 2 +-
.../inmemory/ItRaftStorageVolatilityTest.java | 55 +---
.../runner/app/PlatformTestNodeRunner.java | 216 ++++++++++++++-
.../ignite/internal/runner/app/TableTestUtils.java | 62 +++++
.../schemasync/ItSchemaSyncSingleNodeTest.java | 215 +++++++++++++++
.../ignite/internal/sql/api/ItCommonApiTest.java | 40 +--
.../ignite/internal/table/ItRoReadsTest.java | 50 ++--
.../org/apache/ignite/internal/app/IgniteImpl.java | 10 +
.../table/distributed/TableIdRegistry.java | 48 ++++
.../RequestType.java => TableIdTranslator.java} | 52 +---
.../internal/table/distributed/TableManager.java | 33 +++
.../distributed/replicator/CatalogTables.java | 42 +++
.../replicator/CatalogTablesWithIdConversion.java | 47 ++++
.../RequestType.java => DirectCatalogTables.java} | 54 ++--
.../replicator/PartitionReplicaListener.java | 298 +++++++++++++++------
.../replicator/SchemaCompatValidator.java | 24 +-
.../distributed/replicator/action/RequestType.java | 40 ++-
.../PartitionReplicaListenerIndexLockingTest.java | 8 +
.../replication/PartitionReplicaListenerTest.java | 289 ++++++++++++++++----
.../replicator/action/RequestTypeTest.java | 60 +++++
.../apache/ignite/distributed/ItTxTestCluster.java | 10 +
.../replicator/action/RequestTypes.java | 78 +++++-
.../table/impl/DummyInternalTableImpl.java | 11 +
24 files changed, 1425 insertions(+), 320 deletions(-)
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index 99f13d0e64..ea7cf08de8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -337,6 +337,7 @@ public class SchemaSynchronizationTest : IgniteTestsBase
}
[Test]
+ [Ignore("https://issues.apache.org/jira/browse/IGNITE-20399")]
public async Task TestSchemaUpdateWhileStreaming([Values(true, false)] bool insertNewColumn)
{
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (KEY bigint PRIMARY KEY)");
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
index d0562a1373..ad99e6567d 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/SessionUtils.java
@@ -35,7 +35,7 @@ public class SessionUtils {
* be executed n an implicit transaction.
*/
public static void executeUpdate(String sql, Session session, @Nullable Transaction transaction) {
- try (ResultSet ignored = session.execute(transaction, sql)) {
+ try (ResultSet<?> ignored = session.execute(transaction, sql)) {
// Do nothing, just adhere to the syntactic ceremony...
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index 5bd78aa357..e865b7573f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -20,9 +20,7 @@ package org.apache.ignite.internal.inmemory;
import static ca.seinesoftware.hamcrest.path.PathMatcher.exists;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_PARTITION_COUNT;
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
@@ -36,19 +34,13 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.raft.configuration.EntryCountBudgetChange;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
-import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageChange;
import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
import org.junit.jupiter.api.Test;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -228,45 +220,24 @@ class ItRaftStorageVolatilityTest extends ClusterPerTestIntegrationTest {
@Test
void logSpillsOutToDisk() {
- node(0).nodeConfiguration().getConfiguration(RaftConfiguration.KEY).change(cfg -> {
- cfg.changeVolatileRaft(change -> {
- change.changeLogStorage(budgetChange -> budgetChange.convert(EntryCountBudgetChange.class).changeEntriesCountLimit(1));
- });
- });
-
createTableWithMaxOneInMemoryEntryAllowed("PERSON");
executeSql("INSERT INTO PERSON(ID, NAME) VALUES (1, 'JOHN')");
executeSql("INSERT INTO PERSON(ID, NAME) VALUES (2, 'JANE')");
}
+ @SuppressWarnings("resource")
private void createTableWithMaxOneInMemoryEntryAllowed(String tableName) {
- DistributionZoneManager distributionZoneManager = node(0).distributionZoneManager();
-
- String zoneName = "zone1";
-
- createZone(
- distributionZoneManager,
- zoneName,
- 1,
- DEFAULT_PARTITION_COUNT,
- dataStorageChange -> dataStorageChange.convert(VolatilePageMemoryDataStorageChange.class)
- );
-
- int zoneId = distributionZoneManager.getZoneId(zoneName);
-
- TableDefinition tableDef = SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
- SchemaBuilders.column("ID", ColumnType.INT32).build(),
- SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true).build()
- ).withPrimaryKey("ID").build();
+ CompletableFuture<Void> configUpdateFuture = node(0).nodeConfiguration().getConfiguration(RaftConfiguration.KEY).change(cfg -> {
+ cfg.changeVolatileRaft(change -> {
+ change.changeLogStorage(budgetChange -> budgetChange.convert(EntryCountBudgetChange.class).changeEntriesCountLimit(1));
+ });
+ });
+ assertThat(configUpdateFuture, willCompleteSuccessfully());
- assertThat(
- ((TableManager) node(0).tables()).createTableAsync(
- tableName,
- DEFAULT_ZONE_NAME,
- tableChange -> SchemaConfigurationConverter.convert(tableDef, tableChange).changeZoneId(zoneId)
- ),
- CompletableFutureMatcher.willCompleteSuccessfully()
- );
+ cluster.doInSession(0, session -> {
+ session.execute(null, "create zone zone1 engine aimem with partitions=1, replicas=1");
+ session.execute(null, "create table " + tableName + " (id int primary key, name varchar) with primary_zone='ZONE1'");
+ });
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index b5dc5c9222..331027c3b4 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -18,10 +18,29 @@
package org.apache.ignite.internal.runner.app;
import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static org.apache.ignite.internal.runner.app.TableTestUtils.createTable;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.escapeWindowsPath;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getResourcePath;
+import static org.apache.ignite.sql.ColumnType.BITMASK;
+import static org.apache.ignite.sql.ColumnType.BOOLEAN;
+import static org.apache.ignite.sql.ColumnType.BYTE_ARRAY;
+import static org.apache.ignite.sql.ColumnType.DATE;
+import static org.apache.ignite.sql.ColumnType.DATETIME;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.DOUBLE;
+import static org.apache.ignite.sql.ColumnType.FLOAT;
+import static org.apache.ignite.sql.ColumnType.INT16;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.INT64;
+import static org.apache.ignite.sql.ColumnType.INT8;
+import static org.apache.ignite.sql.ColumnType.NUMBER;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.apache.ignite.sql.ColumnType.TIME;
+import static org.apache.ignite.sql.ColumnType.TIMESTAMP;
+import static org.apache.ignite.sql.ColumnType.UUID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import io.netty.util.ResourceLeakDetector;
import java.io.IOException;
@@ -39,6 +58,7 @@ import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
import org.apache.ignite.internal.configuration.BasicAuthenticationProviderChange;
import org.apache.ignite.internal.configuration.SecurityConfiguration;
@@ -258,16 +278,16 @@ public class PlatformTestNodeRunner {
private static void createTables(Ignite node) {
var keyCol = "key";
- createZone(((IgniteImpl) node).distributionZoneManager(), ZONE_NAME, 10, 1);
+ IgniteImpl ignite = (IgniteImpl) node;
- TableDefinition schTbl = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME).columns(
- SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
- SchemaBuilders.column("val", ColumnType.string()).asNullable(true).build()
- ).withPrimaryKey(keyCol).build();
+ try (Session session = node.sql().createSession()) {
+ session.execute(null, "CREATE ZONE \"" + ZONE_NAME + "\" WITH partitions=10, replicas=1");
+ }
- await(((TableManager) node.tables()).createTableAsync(schTbl.name(), ZONE_NAME, tblCh ->
- SchemaConfigurationConverter.convert(schTbl, tblCh)
- ));
+ try (Session session = node.sql().createSession()) {
+ session.execute(null, "CREATE TABLE " + TABLE_NAME + "(" + keyCol + " bigint PRIMARY KEY, val varchar) WITH primary_zone='"
+ + ZONE_NAME + "'");
+ }
int maxTimePrecision = TemporalColumnType.MAX_TIME_PRECISION;
@@ -298,6 +318,36 @@ public class PlatformTestNodeRunner {
SchemaConfigurationConverter.convert(schTblAll, tblCh)
));
+ createTable(
+ ignite.catalogManager(),
+ DEFAULT_SCHEMA_NAME,
+ ZONE_NAME,
+ TABLE_NAME_ALL_COLUMNS.toUpperCase(),
+ List.of(
+ ColumnParams.builder().name(keyCol.toUpperCase()).type(INT64).build(),
+ ColumnParams.builder().name("STR").type(STRING).nullable(true).build(),
+ ColumnParams.builder().name("INT8").type(INT8).nullable(true).build(),
+ ColumnParams.builder().name("INT16").type(INT16).nullable(true).build(),
+ ColumnParams.builder().name("INT32").type(INT32).nullable(true).build(),
+ ColumnParams.builder().name("INT64").type(INT64).nullable(true).build(),
+ ColumnParams.builder().name("FLOAT").type(FLOAT).nullable(true).build(),
+ ColumnParams.builder().name("DOUBLE").type(DOUBLE).nullable(true).build(),
+ ColumnParams.builder().name("UUID").type(UUID).nullable(true).build(),
+ ColumnParams.builder().name("DATE").type(DATE).nullable(true).build(),
+ ColumnParams.builder().name("BITMASK").type(BITMASK).length(64).nullable(true).build(),
+ ColumnParams.builder().name("TIME").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("TIME2").type(TIME).precision(2).nullable(true).build(),
+ ColumnParams.builder().name("DATETIME").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("DATETIME2").type(DATETIME).precision(3).nullable(true).build(),
+ ColumnParams.builder().name("TIMESTAMP").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("TIMESTAMP2").type(TIMESTAMP).precision(4).nullable(true).build(),
+ ColumnParams.builder().name("BLOB").type(BYTE_ARRAY).nullable(true).build(),
+ ColumnParams.builder().name("DECIMAL").type(DECIMAL).precision(19).scale(3).nullable(true).build(),
+ ColumnParams.builder().name("BOOLEAN").type(BOOLEAN).nullable(true).build()
+ ),
+ List.of(keyCol.toUpperCase())
+ );
+
// TODO IGNITE-18431 remove extra table, use TABLE_NAME_ALL_COLUMNS for SQL tests.
TableDefinition schTblAllSql = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME_ALL_COLUMNS_SQL).columns(
SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
@@ -325,6 +375,36 @@ public class PlatformTestNodeRunner {
SchemaConfigurationConverter.convert(schTblAllSql, tblCh)
));
+ // TODO IGNITE-18431 remove extra table, use TABLE_NAME_ALL_COLUMNS for SQL tests.
+ createTable(
+ ignite.catalogManager(),
+ DEFAULT_SCHEMA_NAME,
+ ZONE_NAME,
+ TABLE_NAME_ALL_COLUMNS_SQL.toUpperCase(),
+ List.of(
+ ColumnParams.builder().name(keyCol.toUpperCase()).type(INT64).build(),
+ ColumnParams.builder().name("STR").type(STRING).nullable(true).build(),
+ ColumnParams.builder().name("INT8").type(INT8).nullable(true).build(),
+ ColumnParams.builder().name("INT16").type(INT16).nullable(true).build(),
+ ColumnParams.builder().name("INT32").type(INT32).nullable(true).build(),
+ ColumnParams.builder().name("INT64").type(INT64).nullable(true).build(),
+ ColumnParams.builder().name("FLOAT").type(FLOAT).nullable(true).build(),
+ ColumnParams.builder().name("DOUBLE").type(DOUBLE).nullable(true).build(),
+ ColumnParams.builder().name("UUID").type(UUID).nullable(true).build(),
+ ColumnParams.builder().name("DATE").type(DATE).nullable(true).build(),
+ ColumnParams.builder().name("TIME").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("TIME2").type(TIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("DATETIME").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("DATETIME2").type(DATETIME).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("TIMESTAMP").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("TIMESTAMP2").type(TIMESTAMP).precision(maxTimePrecision).nullable(true).build(),
+ ColumnParams.builder().name("BLOB").type(BYTE_ARRAY).nullable(true).build(),
+ ColumnParams.builder().name("DECIMAL").type(DECIMAL).precision(19).scale(3).nullable(true).build(),
+ ColumnParams.builder().name("BOOLEAN").type(BOOLEAN).nullable(true).build()
+ ),
+ List.of(keyCol.toUpperCase())
+ );
+
createTwoColumnTable(node, ColumnType.INT8);
createTwoColumnTable(node, ColumnType.BOOLEAN);
createTwoColumnTable(node, ColumnType.INT16);
@@ -342,6 +422,108 @@ public class PlatformTestNodeRunner {
createTwoColumnTable(node, ColumnType.number());
createTwoColumnTable(node, ColumnType.blob());
createTwoColumnTable(node, ColumnType.bitmaskOf(32));
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(INT8).build(),
+ ColumnParams.builder().name("VAL").type(INT8).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(BOOLEAN).build(),
+ ColumnParams.builder().name("VAL").type(BOOLEAN).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(INT16).build(),
+ ColumnParams.builder().name("VAL").type(INT16).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(INT32).build(),
+ ColumnParams.builder().name("VAL").type(INT32).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(INT64).build(),
+ ColumnParams.builder().name("VAL").type(INT64).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(FLOAT).build(),
+ ColumnParams.builder().name("VAL").type(FLOAT).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(DOUBLE).build(),
+ ColumnParams.builder().name("VAL").type(DOUBLE).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(org.apache.ignite.sql.ColumnType.UUID).build(),
+ ColumnParams.builder().name("VAL").type(org.apache.ignite.sql.ColumnType.UUID).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(DECIMAL).precision(19).scale(3).build(),
+ ColumnParams.builder().name("VAL").type(DECIMAL).precision(19).scale(3).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(STRING).build(),
+ ColumnParams.builder().name("VAL").type(STRING).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(DATE).build(),
+ ColumnParams.builder().name("VAL").type(DATE).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(DATETIME).precision(6).build(),
+ ColumnParams.builder().name("VAL").type(DATETIME).precision(6).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(TIME).precision(6).build(),
+ ColumnParams.builder().name("VAL").type(TIME).precision(6).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(TIMESTAMP).precision(6).build(),
+ ColumnParams.builder().name("VAL").type(TIMESTAMP).precision(6).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(NUMBER).precision(Integer.MAX_VALUE).build(),
+ ColumnParams.builder().name("VAL").type(NUMBER).precision(Integer.MAX_VALUE).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(BYTE_ARRAY).build(),
+ ColumnParams.builder().name("VAL").type(BYTE_ARRAY).nullable(true).build()
+ );
+
+ createTwoColumnTableInCatalog(
+ ignite,
+ ColumnParams.builder().name("KEY").type(BITMASK).length(32).build(),
+ ColumnParams.builder().name("VAL").type(BITMASK).length(32).nullable(true).build()
+ );
}
private static void createTwoColumnTable(Ignite node, ColumnType type) {
@@ -357,6 +539,22 @@ public class PlatformTestNodeRunner {
));
}
+ private static void createTwoColumnTableInCatalog(IgniteImpl ignite, ColumnParams keyColumnParams, ColumnParams valueColumnParams) {
+ assertEquals(keyColumnParams.type(), valueColumnParams.type());
+
+ org.apache.ignite.sql.ColumnType typeForTableName = keyColumnParams.type();
+ String typeName = typeForTableName == BYTE_ARRAY ? "BYTES" : typeForTableName.name();
+
+ createTable(
+ ignite.catalogManager(),
+ DEFAULT_SCHEMA_NAME,
+ ZONE_NAME,
+ ("tbl_" + typeName).toUpperCase(),
+ List.of(keyColumnParams, valueColumnParams),
+ List.of(keyColumnParams.name())
+ );
+ }
+
/**
* Gets the thin client port.
*
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java
new file mode 100644
index 0000000000..f8add797f0
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/TableTestUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.commands.CreateTableCommand;
+
+/**
+ * Utils to manage tables inside tests.
+ */
+// TODO: IGNITE-19502 - remove after switching to the Catalog.
+public class TableTestUtils {
+ /**
+ * Creates table in the catalog.
+ *
+ * @param catalogManager Catalog manager.
+ * @param schemaName Schema name.
+ * @param zoneName Zone name.
+ * @param tableName Table name.
+ * @param columns Table columns.
+ * @param pkColumns Primary key columns.
+ */
+ public static void createTable(
+ CatalogManager catalogManager,
+ String schemaName,
+ String zoneName,
+ String tableName,
+ List<ColumnParams> columns,
+ List<String> pkColumns
+ ) {
+ CatalogCommand command = CreateTableCommand.builder()
+ .schemaName(schemaName)
+ .zone(zoneName)
+ .tableName(tableName)
+ .columns(columns)
+ .primaryKeyColumns(pkColumns)
+ .build();
+
+ assertThat(catalogManager.execute(command), willCompleteSuccessfully());
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
new file mode 100644
index 0000000000..ac82e5d461
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncSingleNodeTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests about basic Schema Synchronization properties that can be tested using just one Ignite node.
+ */
+class ItSchemaSyncSingleNodeTest extends ClusterPerTestIntegrationTest {
+ private static final int NODES_TO_START = 1;
+
+ private static final String TABLE_NAME = "test";
+ private static final String UNRELATED_TABLE_NAME = "unrelated_table";
+
+ private static final int KEY = 1;
+
+ private IgniteImpl node;
+
+ @Override
+ protected int initialNodes() {
+ return NODES_TO_START;
+ }
+
+ @BeforeEach
+ void assignNode() {
+ node = cluster.node(0);
+ }
+
+ /**
+ * Makes sure that the following sequence results in an operation failure and transaction rollback.
+ *
+ * <ol>
+ * <li>A table is created</li>
+ * <li>A transaction is started</li>
+ * <li>The table is enlisted in the transaction</li>
+ * <li>The table is ALTERed</li>
+ * <li>An attempt to read or write to the table in the transaction is made</li>
+ * </ol>
+ */
+ @ParameterizedTest
+ @EnumSource(Operation.class)
+ void readWriteOperationInTxAfterAlteringSchemaOnTargetTableIsRejected(Operation operation) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+ });
+
+ Table table = node.tables().table(TABLE_NAME);
+
+ putPreExistingValueTo(table);
+
+ InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+ enlistTableInTransaction(table, tx);
+
+ alterTable(TABLE_NAME);
+
+ IgniteException ex;
+
+ if (operation.sql()) {
+ ex = assertThrows(IgniteException.class, () -> operation.execute(table, tx, cluster));
+ assertThat(
+ ex.getMessage(),
+ containsString("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2")
+ );
+ } else {
+ ex = assertThrows(IncompatibleSchemaException.class, () -> operation.execute(table, tx, cluster));
+ assertThat(
+ ex.getMessage(),
+ is("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2]")
+ );
+ }
+
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+
+ // TODO: IGNITE-20342 - Assert for SQL too.
+ if (!operation.sql()) {
+ assertThat(tx.state(), is(TxState.ABORTED));
+ }
+ }
+
+ private void alterTable(String tableName) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN added int", session);
+ });
+ }
+
+ private static void putPreExistingValueTo(Table table) {
+ table.keyValueView().put(null, Tuple.create().set("id", KEY), Tuple.create().set("val", "original"));
+ }
+
+ private void enlistTableInTransaction(Table table, Transaction tx) {
+ executeReadOn(table, tx, cluster);
+ }
+
+ private static void executeReadOn(Table table, Transaction tx, Cluster cluster) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("SELECT * FROM " + table.name(), session, tx);
+ });
+ }
+
+ private enum Operation {
+ KV_WRITE {
+ @Override
+ void execute(Table table, Transaction tx, Cluster cluster) {
+ putInTx(table, tx);
+ }
+
+ @Override
+ boolean sql() {
+ return false;
+ }
+ },
+ KV_READ {
+ @Override
+ void execute(Table table, Transaction tx, Cluster cluster) {
+ table.keyValueView().get(tx, Tuple.create().set("id", KEY));
+ }
+
+ @Override
+ boolean sql() {
+ return false;
+ }
+ },
+ SQL_WRITE {
+ @Override
+ void execute(Table table, Transaction tx, Cluster cluster) {
+ cluster.doInSession(0, session -> {
+ executeUpdate("UPDATE " + table.name() + " SET val = 'new value' WHERE id = " + KEY, session, tx);
+ });
+ }
+
+ @Override
+ boolean sql() {
+ return true;
+ }
+ },
+ SQL_READ {
+ @Override
+ void execute(Table table, Transaction tx, Cluster cluster) {
+ executeReadOn(table, tx, cluster);
+ }
+
+ @Override
+ boolean sql() {
+ return true;
+ }
+ };
+
+ abstract void execute(Table table, Transaction tx, Cluster cluster);
+
+ abstract boolean sql();
+ }
+
+ private static void putInTx(Table table, Transaction tx) {
+ table.keyValueView().put(tx, Tuple.create().set("id", 1), Tuple.create().set("val", "one"));
+ }
+
+ /**
+ * Makes sure that ALTERation of a table does not affect a transaction in which it's not enlisted.
+ */
+ @Test
+ void readWriteOperationInTxAfterAlteringSchemaOnAnotherTableIsUnaffected() {
+ cluster.doInSession(0, session -> {
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+ executeUpdate("CREATE TABLE " + UNRELATED_TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+ });
+
+ Table table = node.tables().table(TABLE_NAME);
+
+ InternalTransaction tx = (InternalTransaction) node.transactions().begin();
+
+ enlistTableInTransaction(table, tx);
+
+ alterTable(UNRELATED_TABLE_NAME);
+
+ assertDoesNotThrow(() -> putInTx(table, tx));
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 53bff13425..aec8028f5b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.sql.api;
-import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_ZONE_NAME;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -34,16 +32,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType.TemporalColumnType;
-import org.apache.ignite.internal.schema.testutils.definition.TableDefinition;
import org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
@@ -111,37 +103,27 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest {
@Test
public void checkTimestampOperations() {
String kvTblName = "tbl_all_columns_sql";
- String schemaName = "PUBLIC";
- String keyCol = "key";
- int maxTimePrecision = TemporalColumnType.MAX_TIME_PRECISION;
+ String keyCol = "KEY";
Ignite node = CLUSTER_NODES.get(0);
// TODO: https://issues.apache.org/jira/browse/IGNITE-19162 Trim all less than millisecond information from timestamp
//String tsStr = "2023-03-29T08:22:33.005007Z";
- String tsStr = "2023-03-29T08:22:33.005Z";
+ // TODO: IGNITE-20105 it should be "2023-03-29T08:22:33.005Z";
+ String tsStr = "2023-03-29T08:22:33Z";
Instant ins = Instant.parse(tsStr);
sql("CREATE TABLE timestamps(id INTEGER PRIMARY KEY, i TIMESTAMP(9))");
- TableDefinition schTblAllSql = SchemaBuilders.tableBuilder(schemaName, kvTblName).columns(
- SchemaBuilders.column(keyCol, ColumnType.INT64).build(),
- SchemaBuilders.column("time", ColumnType.time(maxTimePrecision)).asNullable(true).build(),
- SchemaBuilders.column("timestamp", ColumnType.timestamp(maxTimePrecision)).asNullable(true).build(),
- SchemaBuilders.column("datetime", ColumnType.datetime(maxTimePrecision)).asNullable(true).build()
- ).withPrimaryKey(keyCol).build();
-
- await(((TableManager) node.tables()).createTableAsync(schTblAllSql.name(), DEFAULT_ZONE_NAME, tblCh ->
- SchemaConfigurationConverter.convert(schTblAllSql, tblCh)
- ));
+ // TODO: IGNITE-19274 Add column with TIMESTAMP WITH LOCAL TIME ZONE
+ sql(String.format("CREATE TABLE %s(\"%s\" INTEGER PRIMARY KEY, \"TIMESTAMP\" TIMESTAMP(9))", kvTblName, keyCol));
Table tbl = node.tables().table(kvTblName);
Tuple rec = Tuple.create()
- .set("KEY", 1L)
- .set("TIMESTAMP", ins)
- .set("DATETIME", LocalDateTime.of(2023, 1, 18, 18, 9, 29));
+ .set("KEY", 1)
+ .set("TIMESTAMP", LocalDateTime.of(2023, 3, 29, 8, 22, 33));
tbl.recordView().insert(null, rec);
@@ -156,13 +138,15 @@ public class ItCommonApiTest extends ClusterPerClassIntegrationTest {
String srtRepr = ins.toString();
- assertEquals(srtRepr.substring(0, srtRepr.length() - 1), res.next().datetimeValue(0).toString());
+ String expDateTimeStr = srtRepr.substring(0, srtRepr.length() - 1);
+
+ assertEquals(expDateTimeStr, res.next().datetimeValue(0).toString());
- String query = "select \"KEY\", \"TIME\", \"DATETIME\", \"TIMESTAMP\" from TBL_ALL_COLUMNS_SQL ORDER BY KEY";
+ String query = "select \"KEY\", \"TIMESTAMP\" from TBL_ALL_COLUMNS_SQL ORDER BY KEY";
res = ses.execute(null, query);
- assertEquals(ins, res.next().timestampValue(3));
+ assertEquals(expDateTimeStr, res.next().datetimeValue(1).toString());
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 8b89bf2a37..0bc84b1bb0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -18,16 +18,14 @@
package org.apache.ignite.internal.table;
import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_REPLICA_COUNT;
-import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
-import static org.apache.ignite.internal.runner.app.ItTablesApiTest.SCHEMA;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
-import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.convert;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -43,7 +41,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -53,10 +50,6 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnDefinition;
-import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
-import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -64,6 +57,7 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.sql.Session;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -83,7 +77,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
public class ItRoReadsTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(ItRoReadsTest.class);
- private static final String TABLE_NAME = "some-table";
+ private static final String TABLE_NAME = "SOME_TABLE";
private static final SchemaDescriptor SCHEMA_1 = new SchemaDescriptor(
1,
@@ -509,28 +503,36 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
}
private static Table startTable(Ignite node, String tableName) {
- List<ColumnDefinition> cols = new ArrayList<>();
- cols.add(SchemaBuilders.column("key", ColumnType.INT64).build());
- cols.add(SchemaBuilders.column("valInt", ColumnType.INT32).asNullable(true).build());
- cols.add(SchemaBuilders.column("valStr", ColumnType.string()).withDefaultValue("default").build());
+ String zoneName = zoneNameForTable(tableName);
+
+ try (Session session = node.sql().createSession()) {
+ session.execute(null, String.format("create zone \"%s\" with partitions=1, replicas=%d", zoneName, DEFAULT_REPLICA_COUNT));
+
+ session.execute(null,
+ String.format(
+ "create table \"%s\" (key bigint primary key, valInt int, valStr varchar default 'default') "
+ + "with primary_zone='%s'",
+ tableName, zoneName
+ )
+ );
+ }
- String zoneName = "zone_" + tableName;
- createZone(((IgniteImpl) node).distributionZoneManager(), zoneName, 1, DEFAULT_REPLICA_COUNT);
+ Table table = node.tables().table(tableName);
- CompletableFuture<Table> createTableCompletableFuture = ((TableManager) node.tables()).createTableAsync(
- tableName,
- zoneName,
- tblCh -> convert(SchemaBuilders.tableBuilder(SCHEMA, tableName).columns(cols).withPrimaryKey("key").build(), tblCh)
- );
+ assertNotNull(table);
- assertThat(createTableCompletableFuture, willCompleteSuccessfully());
+ return table;
+ }
- return createTableCompletableFuture.join();
+ private static String zoneNameForTable(String tableName) {
+ return "ZONE_" + tableName;
}
private static void stopTable(Ignite node, String tableName) {
- assertThat(((TableManager) node.tables()).dropTableAsync(tableName), willCompleteSuccessfully());
- DistributionZonesTestUtil.dropZone(((IgniteImpl) node).distributionZoneManager(), "zone_" + tableName);
+ try (Session session = node.sql().createSession()) {
+ session.execute(null, "drop table " + tableName);
+ session.execute(null, "drop zone " + zoneNameForTable(tableName));
+ }
}
protected static int nodes() {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 37c9062aca..990284ff99 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1132,4 +1132,14 @@ public class IgniteImpl implements Ignite {
public PlacementDriver placementDriver() {
return placementDriverMgr.placementDriver();
}
+
+ /**
+ * Returns the CatalogManager.
+ *
+ * @return Catalog manager.
+ */
+ @TestOnly
+ public CatalogManager catalogManager() {
+ return catalogManager;
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java
new file mode 100644
index 0000000000..80b6c1e903
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Registry for configTableId->catalogTableId pairs.
+ */
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+public class TableIdRegistry implements TableIdTranslator {
+ private final Map<Integer, Integer> configToCatalogIds = new ConcurrentHashMap<>();
+
+ @Override
+ public int configIdToCatalogId(int configTableId) {
+ Integer catalogId = configToCatalogIds.get(configTableId);
+
+ assert catalogId != null : "Translating " + configTableId + ", but nothing was found";
+
+ return catalogId;
+ }
+
+ /**
+ * Registers a mapping of one table ID to another (both belong to the same table).
+ *
+ * @param configTableId Config-defined table ID.
+ * @param catalogTableId Catalog-defined table ID.
+ */
+ public void registerMapping(int configTableId, int catalogTableId) {
+ configToCatalogIds.put(configTableId, catalogTableId);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
similarity index 58%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
index 6baae3a1c5..fa5eacf80c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableIdTranslator.java
@@ -15,47 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.replicator.action;
+package org.apache.ignite.internal.table.distributed;
/**
- * Transaction operation type.
+ * Translates configuration-defined table IDs to Catalog-defined table IDs.
*/
-public enum RequestType {
- RW_GET,
-
- RW_GET_ALL,
-
- RW_DELETE,
-
- RW_DELETE_ALL,
-
- RW_DELETE_EXACT,
-
- RW_DELETE_EXACT_ALL,
-
- RW_INSERT,
-
- RW_INSERT_ALL,
-
- RW_UPSERT,
-
- RW_UPSERT_ALL,
-
- RW_REPLACE,
-
- RW_REPLACE_IF_EXIST,
-
- RW_GET_AND_DELETE,
-
- RW_GET_AND_REPLACE,
-
- RW_GET_AND_UPSERT,
-
- RW_SCAN,
-
- RO_GET,
-
- RO_GET_ALL,
-
- RO_SCAN
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+public interface TableIdTranslator {
+ /**
+ * Translates a configuration-defined table ID to a Catalog-defined table IDs.
+ *
+ * @param configTableId Configuration-defined table ID to translate.
+ * @return Catalog-defined table ID corresponding to the argument.
+ */
+ int configIdToCatalogId(int configTableId);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b63129b35f..1bbb0b1e45 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -93,6 +93,8 @@ import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogDataStorageDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.causality.CompletionListener;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -165,6 +167,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnapshotStorageFactory;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotAwarePartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTablesWithIdConversion;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
@@ -395,6 +398,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Placement driver. */
private final PlacementDriver placementDriver;
+ private final TableIdRegistry tableIdTranslator = new TableIdRegistry();
+
/**
* Creates a new table manager.
*
@@ -582,6 +587,32 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
});
addMessageHandler(clusterService.messagingService());
+
+ // TODO: IGNITE-19499 - remove when switched to the Catalog.
+ recoverTableIdsMapping();
+
+ // TODO: IGNITE-19499 - remove when switched to the Catalog.
+ catalogService.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+ CreateTableEventParameters event = (CreateTableEventParameters) parameters;
+
+ TableView tableConfig = tablesCfg.tables().value().get(event.tableDescriptor().name());
+
+ assert tableConfig != null : "No table config found by name " + event.tableDescriptor().name();
+
+ tableIdTranslator.registerMapping(tableConfig.id(), event.tableId());
+
+ return completedFuture(false);
+ });
+ }
+
+ private void recoverTableIdsMapping() {
+ tablesCfg.tables().value().forEach(tableView -> {
+ CatalogTableDescriptor tableDescriptor = catalogService.table(tableView.name(), Long.MAX_VALUE);
+
+ assert tableDescriptor != null : "No table in the Catalog with name " + tableView.name();
+
+ tableIdTranslator.registerMapping(tableView.id(), tableDescriptor.id());
+ });
}
private CompletableFuture<Void> performRebalanceOnRecovery(long revision) {
@@ -1056,6 +1087,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
indexBuilder,
schemaSyncService,
catalogService,
+ // TODO: IGNITE-19499 - replace with DirectCatalogTables
+ new CatalogTablesWithIdConversion(catalogService, tableIdTranslator),
tablesCfg,
placementDriver
);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java
new file mode 100644
index 0000000000..d68c36b0d9
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTables.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This interface is needed to cope with the fact and for some time we have both configuration-defined and Catalog-defined
+ * table IDs (which are usually not equal to each other even for the same table); it allows to hide the
+ * translation logic as most of the code operates with config-defined table IDs, while {@link CatalogService} requires
+ * its own table IDs.
+ */
+// TODO: IGNITE-20386 - remove after the switch to the Catalog.
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface CatalogTables {
+ /**
+ * Gets a table by a timestamp. Make sure there is enough metadata (see {@link SchemaSyncService}) before invoking this method.
+ *
+ * @param tableId ID of the table (before the switch to the Catalog, these are config-defined IDs).
+ * @param timestamp Timestamp at which to look for a table.
+ * @return Table descriptor or {@code null} if none was found.
+ */
+ @Nullable CatalogTableDescriptor table(int tableId, long timestamp);
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java
new file mode 100644
index 0000000000..caadf73246
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/CatalogTablesWithIdConversion.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.table.distributed.TableIdTranslator;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation which accepts configuration-defined table IDs, converts them under the hood to Catalog-defined ones
+ * and uses them to access table information in the {@link CatalogService}.
+ */
+// TODO: IGNITE-20386 - replace all usages with DirectCatalogTables and remove after the switch to the Catalog.
+public class CatalogTablesWithIdConversion implements CatalogTables {
+ private final CatalogService catalogService;
+
+ private final TableIdTranslator tableIdTranslator;
+
+ /** Constructor. */
+ public CatalogTablesWithIdConversion(CatalogService catalogService, TableIdTranslator tableIdTranslator) {
+ this.catalogService = catalogService;
+ this.tableIdTranslator = tableIdTranslator;
+ }
+
+ @Override
+ public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) {
+ int catalogTableId = tableIdTranslator.configIdToCatalogId(tableId);
+
+ return catalogService.table(catalogTableId, timestamp);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
similarity index 50%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
index 6baae3a1c5..20b73d7977 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DirectCatalogTables.java
@@ -15,47 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table.distributed.replicator.action;
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.jetbrains.annotations.Nullable;
/**
- * Transaction operation type.
+ * Does not perform any table ID translation and should be used after we switch to the Catalog completely
+ * (when there are only Catalog-defined table IDs, no configuration-defined IDs).
*/
-public enum RequestType {
- RW_GET,
-
- RW_GET_ALL,
-
- RW_DELETE,
-
- RW_DELETE_ALL,
-
- RW_DELETE_EXACT,
-
- RW_DELETE_EXACT_ALL,
-
- RW_INSERT,
-
- RW_INSERT_ALL,
-
- RW_UPSERT,
-
- RW_UPSERT_ALL,
-
- RW_REPLACE,
-
- RW_REPLACE_IF_EXIST,
-
- RW_GET_AND_DELETE,
-
- RW_GET_AND_REPLACE,
-
- RW_GET_AND_UPSERT,
-
- RW_SCAN,
-
- RO_GET,
+public class DirectCatalogTables implements CatalogTables {
+ private final CatalogService catalogService;
- RO_GET_ALL,
+ public DirectCatalogTables(CatalogService catalogService) {
+ this.catalogService = catalogService;
+ }
- RO_SCAN
+ @Override
+ public @Nullable CatalogTableDescriptor table(int tableId, long timestamp) {
+ return catalogService.table(tableId, timestamp);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 2ac0c90e26..7bb2587837 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -308,6 +308,7 @@ public class PartitionReplicaListener implements ReplicaListener {
IndexBuilder indexBuilder,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
+ CatalogTables catalogTables,
TablesConfiguration tablesConfig,
PlacementDriver placementDriver
) {
@@ -336,7 +337,7 @@ public class PartitionReplicaListener implements ReplicaListener {
cursors = new ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
- schemaCompatValidator = new SchemaCompatValidator(schemas);
+ schemaCompatValidator = new SchemaCompatValidator(schemas, catalogTables);
TableView tableConfig = findTableView(tablesConfig.tables().value(), tableId);
@@ -389,6 +390,14 @@ public class PartitionReplicaListener implements ReplicaListener {
// Implicit RW scan can be committed locally on a last batch or error.
return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req, senderId))
+ .thenCompose(rows -> {
+ if (allElementsAreNull(rows)) {
+ return completedFuture(rows);
+ } else {
+ return validateAtTimestamp(req.transactionId())
+ .thenApply(ignored -> rows);
+ }
+ })
.handle((rows, err) -> {
if (req.full() && (err != null || rows.size() < req.batchSize())) {
releaseTxLocks(req.transactionId());
@@ -784,8 +793,8 @@ public class PartitionReplicaListener implements ReplicaListener {
IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
- return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S).thenCompose(tblLock ->
- retrieveExactEntriesUntilCursorEmpty(txId, cursorId, batchCount));
+ return lockManager.acquire(txId, new LockKey(tableId()), LockMode.S)
+ .thenCompose(tblLock -> retrieveExactEntriesUntilCursorEmpty(txId, cursorId, batchCount));
}
/**
@@ -1245,7 +1254,7 @@ public class PartitionReplicaListener implements ReplicaListener {
HybridTimestamp currentTimestamp = hybridClock.now();
HybridTimestamp commitTimestamp = commit ? currentTimestamp : null;
- return catalogVersionFor(currentTimestamp)
+ return reliableCatalogVersionFor(currentTimestamp)
.thenApply(catalogVersion -> {
FinishTxCommandBuilder finishTxCmdBldr = MSG_FACTORY.finishTxCommand()
.txId(txId)
@@ -1313,7 +1322,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
txOps.futures.forEach((opType, futures) -> {
- if (opType == RequestType.RW_GET || opType == RequestType.RW_GET_ALL || opType == RequestType.RW_SCAN) {
+ if (opType.isRwRead()) {
txReadFutures.addAll(futures);
} else {
txUpdateFutures.addAll(futures);
@@ -1339,7 +1348,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return allOffFuturesExceptionIgnored(txUpdateFutures, request).thenCompose(v -> {
long commandTimestamp = hybridClock.nowLong();
- return catalogVersionFor(hybridTimestamp(commandTimestamp))
+ return reliableCatalogVersionFor(hybridTimestamp(commandTimestamp))
.thenCompose(catalogVersion -> {
TxCleanupCommand txCleanupCmd = MSG_FACTORY.txCleanupCommand()
.txId(request.txId())
@@ -1660,9 +1669,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(result);
}
- return updateAllCommand(request, rowIdsToDelete, txCoordinatorId)
- .thenCompose(this::applyUpdateAllCommand)
- .thenApply(v -> result);
+ return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId)
+ .thenCompose(this::applyUpdateAllCommand)
+ .thenApply(ignored -> result);
});
}
case RW_INSERT_ALL: {
@@ -1716,7 +1725,7 @@ public class PartitionReplicaListener implements ReplicaListener {
));
return allOf(insertLockFuts)
- .thenCompose(ignored -> updateAllCommand(request, convertedMap, txCoordinatorId))
+ .thenCompose(ignored -> validateAtTimestampAndBuildUpdateAllCommand(request, convertedMap, txCoordinatorId))
.thenCompose(this::applyUpdateAllCommand)
.thenApply(ignored -> {
// Release short term locks.
@@ -1760,7 +1769,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(null);
}
- return updateAllCommand(request, rowsToUpdate, txCoordinatorId)
+ return validateAtTimestampAndBuildUpdateAllCommand(request, rowsToUpdate, txCoordinatorId)
.thenCompose(this::applyUpdateAllCommand)
.thenRun(() -> {
// Release short term locks.
@@ -1785,7 +1794,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<Object> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<?> processMultiEntryAction(ReadWriteMultiRowPkReplicaRequest request, String txCoordinatorId) {
UUID txId = request.transactionId();
TablePartitionId committedPartitionId = request.commitPartitionId().asTablePartitionId();
List<BinaryTuple> primaryKeys = resolvePks(request.primaryKeys());
@@ -1809,14 +1818,19 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return allOf(rowFuts)
- .thenApply(ignored -> {
+ .thenCompose(ignored -> {
var result = new ArrayList<BinaryRow>(primaryKeys.size());
for (CompletableFuture<BinaryRow> rowFut : rowFuts) {
result.add(rowFut.join());
}
- return result;
+ if (allElementsAreNull(result)) {
+ return completedFuture(result);
+ }
+
+ return validateAtTimestamp(txId)
+ .thenApply(unused -> result);
});
}
case RW_DELETE_ALL: {
@@ -1852,7 +1866,7 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(result);
}
- return updateAllCommand(request, rowIdsToDelete, txCoordinatorId)
+ return validateAtTimestampAndBuildUpdateAllCommand(request, rowIdsToDelete, txCoordinatorId)
.thenCompose(this::applyUpdateAllCommand)
.thenApply(ignored -> result);
});
@@ -1864,6 +1878,16 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
+ private static <T> boolean allElementsAreNull(List<T> list) {
+ for (T element : list) {
+ if (element != null) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
/**
* Executes a command and handles exceptions. A result future can be finished with exception by following rules:
* <ul>
@@ -2001,9 +2025,9 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(false);
}
- return updateCommand(request, validatedRowId.uuid(), null, txCoordinatorId)
+ return validateAtTimestampAndBuildUpdateCommand(request, validatedRowId.uuid(), null, txCoordinatorId)
.thenCompose(this::applyUpdateCommand)
- .thenApply(ignored -> true);
+ .thenApply(ignored -> (Boolean) true);
});
});
}
@@ -2016,13 +2040,15 @@ public class PartitionReplicaListener implements ReplicaListener {
RowId rowId0 = new RowId(partId(), UUID.randomUUID());
return takeLocksForInsert(searchRow, rowId0, txId)
- .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+ .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+ txCoordinatorId
+ )
.thenCompose(this::applyUpdateCommand)
.thenApply(v -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
- return true;
+ return (Boolean) true;
}));
});
}
@@ -2037,12 +2063,17 @@ public class PartitionReplicaListener implements ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
- .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+ .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+ txCoordinatorId
+ )
.thenCompose(this::applyUpdateCommand)
- .thenRun(() -> {
+ .thenApply(ignored -> rowIdLock))
+ .thenApply(rowIdLock -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
- }));
+
+ return null;
+ });
});
}
case RW_GET_AND_UPSERT: {
@@ -2056,7 +2087,9 @@ public class PartitionReplicaListener implements ReplicaListener {
: takeLocksForUpdate(searchRow, rowId0, txId);
return lockFut
- .thenCompose(rowIdLock -> updateCommand(request, rowId0.uuid(), searchRow, txCoordinatorId)
+ .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId0.uuid(), searchRow,
+ txCoordinatorId
+ )
.thenCompose(this::applyUpdateCommand)
.thenApply(v -> {
// Release short term locks.
@@ -2073,7 +2106,9 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowIdLock -> updateCommand(request, rowId.uuid(), searchRow, txCoordinatorId)
+ .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
+ txCoordinatorId
+ )
.thenCompose(this::applyUpdateCommand)
.thenApply(v -> {
// Release short term locks.
@@ -2090,13 +2125,15 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForUpdate(searchRow, rowId, txId)
- .thenCompose(rowIdLock -> updateCommand(request, rowId.uuid(), searchRow, txCoordinatorId)
+ .thenCompose(rowIdLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), searchRow,
+ txCoordinatorId
+ )
.thenCompose(this::applyUpdateCommand)
.thenApply(v -> {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
- return true;
+ return (Boolean) true;
}));
});
}
@@ -2130,6 +2167,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForGet(rowId, txId)
+ .thenCompose(ignored -> validateAtTimestamp(txId))
.thenApply(ignored -> row);
});
}
@@ -2140,9 +2178,9 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(ignored -> updateCommand(request, rowId.uuid(), null, txCoordinatorId))
+ .thenCompose(rowLock -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId))
.thenCompose(this::applyUpdateCommand)
- .thenApply(ignored -> true);
+ .thenApply(ignored -> (Boolean) true);
});
}
case RW_GET_AND_DELETE: {
@@ -2152,7 +2190,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return takeLocksForDelete(row, rowId, txId)
- .thenCompose(ignored -> updateCommand(request, rowId.uuid(), null, txCoordinatorId))
+ .thenCompose(ignored -> validateAtTimestampAndBuildUpdateCommand(request, rowId.uuid(), null, txCoordinatorId))
.thenCompose(this::applyUpdateCommand)
.thenApply(ignored -> row);
});
@@ -2310,7 +2348,10 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txCoordinatorId Transaction coordinator id.
* @return Listener response.
*/
- private CompletableFuture<Boolean> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request, String txCoordinatorId) {
+ private CompletableFuture<Boolean> processTwoEntriesAction(
+ ReadWriteSwapRowReplicaRequest request,
+ String txCoordinatorId
+ ) {
BinaryRow newRow = request.binaryRow();
BinaryRow expectedRow = request.oldBinaryRow();
TablePartitionIdMessage commitPartitionId = request.commitPartitionId();
@@ -2331,8 +2372,8 @@ public class PartitionReplicaListener implements ReplicaListener {
return completedFuture(false);
}
- return updateCommand(commitPartitionId, validatedRowId.get1().uuid(), newRow, txId, request.full(),
- txCoordinatorId
+ return validateAtTimestampAndBuildUpdateCommand(commitPartitionId.asTablePartitionId(),
+ validatedRowId.get1().uuid(), newRow, txId, request.full(), txCoordinatorId
)
.thenCompose(this::applyUpdateCommand)
.thenApply(ignored -> validatedRowId)
@@ -2340,7 +2381,7 @@ public class PartitionReplicaListener implements ReplicaListener {
// Release short term locks.
rowIdLock.get2().forEach(lock -> lockManager.release(lock.txId(), lock.lockKey(), lock.lockMode()));
- return true;
+ return (Boolean) true;
});
});
});
@@ -2524,120 +2565,213 @@ public class PartitionReplicaListener implements ReplicaListener {
});
}
- private CompletableFuture<UpdateCommand> updateCommand(
+ private CompletableFuture<Void> validateAtTimestamp(UUID txId) {
+ HybridTimestamp operationTimestamp = hybridClock.now();
+
+ return schemaSyncService.waitForMetadataCompleteness(operationTimestamp)
+ .thenApply(unused -> {
+ failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
+
+ return null;
+ });
+ }
+
+ /**
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+ *
+ * @param request Request that is being processed.
+ * @param rowUuid Row UUID.
+ * @param row Row.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @return Future that will complete with the constructed {@link UpdateCommand} object.
+ */
+ private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
ReadWriteSingleRowReplicaRequest request,
UUID rowUuid,
@Nullable BinaryRow row,
String txCoordinatorId
) {
- return updateCommand(request.commitPartitionId(), rowUuid, row, request.transactionId(), request.full(), txCoordinatorId);
+ return validateAtTimestampAndBuildUpdateCommand(
+ request.commitPartitionId().asTablePartitionId(),
+ rowUuid,
+ row,
+ request.transactionId(),
+ request.full(),
+ txCoordinatorId
+ );
}
- private CompletableFuture<UpdateCommand> updateCommand(
+ /**
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+ *
+ * @param request Request that is being processed.
+ * @param rowUuid Row UUID.
+ * @param row Row.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @return Future that will complete with the constructed {@link UpdateCommand} object.
+ */
+ private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
ReadWriteSingleRowPkReplicaRequest request,
UUID rowUuid,
@Nullable BinaryRow row,
String txCoordinatorId
) {
- return updateCommand(request.commitPartitionId(), rowUuid, row, request.transactionId(), request.full(), txCoordinatorId);
+ return validateAtTimestampAndBuildUpdateCommand(
+ request.commitPartitionId().asTablePartitionId(),
+ rowUuid,
+ row,
+ request.transactionId(),
+ request.full(),
+ txCoordinatorId
+ );
}
/**
- * Method to construct {@link UpdateCommand} object.
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
*
- * @param tablePartId {@link TablePartitionId} object to construct {@link UpdateCommand} object with.
+ * @param tablePartId {@link TablePartitionId} object.
* @param rowUuid Row UUID.
* @param row Row.
* @param txId Transaction ID.
* @param full {@code True} if this is a full transaction.
* @param txCoordinatorId Transaction coordinator id.
- * @return Constructed {@link UpdateCommand} object.
+ * @return Future that will complete with the constructed {@link UpdateCommand} object.
*/
- private CompletableFuture<UpdateCommand> updateCommand(
- TablePartitionIdMessage tablePartId,
+ private CompletableFuture<UpdateCommand> validateAtTimestampAndBuildUpdateCommand(
+ TablePartitionId tablePartId,
UUID rowUuid,
@Nullable BinaryRow row,
UUID txId,
boolean full,
String txCoordinatorId
) {
- long commandTimestamp = hybridClock.nowLong();
+ HybridTimestamp operationTimestamp = hybridClock.now();
- return catalogVersionFor(hybridTimestamp(commandTimestamp))
+ return reliableCatalogVersionFor(operationTimestamp)
.thenApply(catalogVersion -> {
- UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
- .tablePartitionId(tablePartId)
- .rowUuid(rowUuid)
- .txId(txId)
- .full(full)
- .safeTimeLong(commandTimestamp)
- .txCoordinatorId(txCoordinatorId)
- .requiredCatalogVersion(catalogVersion);
+ failIfSchemaChangedSinceTxStart(txId, operationTimestamp);
- if (row != null) {
- BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
- .binaryTuple(row.tupleSlice())
- .schemaVersion(row.schemaVersion())
- .build();
+ return updateCommand(tablePartId, rowUuid, row, txId, full, txCoordinatorId, operationTimestamp, catalogVersion);
+ });
+ }
- bldr.rowMessage(rowMessage);
- }
+ private static UpdateCommand updateCommand(
+ TablePartitionId tablePartId,
+ UUID rowUuid,
+ @Nullable BinaryRow row,
+ UUID txId,
+ boolean full,
+ String txCoordinatorId,
+ HybridTimestamp operationTimestamp,
+ int catalogVersion
+ ) {
+ UpdateCommandBuilder bldr = MSG_FACTORY.updateCommand()
+ .tablePartitionId(tablePartitionId(tablePartId))
+ .rowUuid(rowUuid)
+ .txId(txId)
+ .full(full)
+ .safeTimeLong(operationTimestamp.longValue())
+ .txCoordinatorId(txCoordinatorId)
+ .requiredCatalogVersion(catalogVersion);
+
+ if (row != null) {
+ BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+
+ bldr.rowMessage(rowMessage);
+ }
- return bldr.build();
- });
+ return bldr.build();
}
- private CompletableFuture<Integer> catalogVersionFor(HybridTimestamp ts) {
+ private void failIfSchemaChangedSinceTxStart(UUID txId, HybridTimestamp operationTimestamp) {
+ schemaCompatValidator.failIfSchemaChangedAfterTxStart(txId, operationTimestamp, tableId());
+ }
+
+ private CompletableFuture<Integer> reliableCatalogVersionFor(HybridTimestamp ts) {
return schemaSyncService.waitForMetadataCompleteness(ts)
.thenApply(unused -> catalogService.activeCatalogVersion(ts.longValue()));
}
- private CompletableFuture<UpdateAllCommand> updateAllCommand(
+ /**
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+ *
+ * @param request Request that is being processed.
+ * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
+ */
+ private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
ReadWriteMultiRowReplicaRequest request,
Map<UUID, BinaryRowMessage> rowsToUpdate,
String txCoordinatorId
) {
- return updateAllCommand(rowsToUpdate, request.commitPartitionId(), request.transactionId(), request.full(), txCoordinatorId);
+ return validateAtTimestampAndBuildUpdateAllCommand(
+ rowsToUpdate,
+ request.commitPartitionId(),
+ request.transactionId(),
+ request.full(),
+ txCoordinatorId
+ );
}
- private CompletableFuture<UpdateAllCommand> updateAllCommand(
+ /**
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
+ *
+ * @param request Request that is being processed.
+ * @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
+ * @param txCoordinatorId Transaction coordinator id.
+ * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
+ */
+ private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
ReadWriteMultiRowPkReplicaRequest request,
Map<UUID, BinaryRowMessage> rowsToUpdate,
String txCoordinatorId
) {
- return updateAllCommand(rowsToUpdate, request.commitPartitionId(), request.transactionId(), request.full(), txCoordinatorId);
+ return validateAtTimestampAndBuildUpdateAllCommand(
+ rowsToUpdate,
+ request.commitPartitionId(),
+ request.transactionId(),
+ request.full(),
+ txCoordinatorId
+ );
}
/**
- * Method to construct {@link UpdateAllCommand} object.
+ * Chooses operation timestamp, makes validations that require it and constructs an {@link UpdateCommand} object.
*
* @param rowsToUpdate All {@link BinaryRow}s represented as {@link ByteBuffer}s to be updated.
* @param commitPartitionId Partition ID that these rows belong to.
* @param transactionId Transaction ID.
* @param full {@code true} if this is a single-command transaction.
* @param txCoordinatorId Transaction coordinator id.
- * @return Constructed {@link UpdateAllCommand} object.
+ * @return Future that will complete with the constructed {@link UpdateAllCommand} object.
*/
- private CompletableFuture<UpdateAllCommand> updateAllCommand(
+ private CompletableFuture<UpdateAllCommand> validateAtTimestampAndBuildUpdateAllCommand(
Map<UUID, BinaryRowMessage> rowsToUpdate,
TablePartitionIdMessage commitPartitionId,
UUID transactionId,
boolean full,
String txCoordinatorId
) {
- long commandTimestamp = hybridClock.nowLong();
-
- return catalogVersionFor(hybridTimestamp(commandTimestamp))
- .thenApply(catalogVersion -> MSG_FACTORY.updateAllCommand()
- .tablePartitionId(commitPartitionId)
- .rowsToUpdate(rowsToUpdate)
- .txId(transactionId)
- .safeTimeLong(commandTimestamp)
- .full(full)
- .txCoordinatorId(txCoordinatorId)
- .requiredCatalogVersion(catalogVersion)
- .build()
- );
+ HybridTimestamp operationTimestamp = hybridClock.now();
+
+ return reliableCatalogVersionFor(operationTimestamp)
+ .thenApply(catalogVersion -> {
+ failIfSchemaChangedSinceTxStart(transactionId, operationTimestamp);
+
+ return MSG_FACTORY.updateAllCommand()
+ .tablePartitionId(commitPartitionId)
+ .rowsToUpdate(rowsToUpdate)
+ .txId(transactionId)
+ .safeTimeLong(operationTimestamp.longValue())
+ .full(full)
+ .txCoordinatorId(txCoordinatorId)
+ .requiredCatalogVersion(catalogVersion)
+ .build();
+ });
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
index 738c85f3f3..26ef6d8811 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
@@ -36,9 +37,12 @@ import org.jetbrains.annotations.Nullable;
*/
class SchemaCompatValidator {
private final Schemas schemas;
+ private final CatalogTables catalogTables;
- SchemaCompatValidator(Schemas schemas) {
+ /** Constructor. */
+ SchemaCompatValidator(Schemas schemas, CatalogTables catalogTables) {
this.schemas = schemas;
+ this.catalogTables = catalogTables;
}
/**
@@ -165,4 +169,22 @@ class SchemaCompatValidator {
// TODO: IGNITE-19229 - is backward compatibility always symmetric with the forward compatibility?
return isForwardCompatible(newSchema, oldSchema);
}
+
+ void failIfSchemaChangedAfterTxStart(UUID txId, HybridTimestamp operationTimestamp, int tableId) {
+ HybridTimestamp beginTs = TransactionIds.beginTimestamp(txId);
+ CatalogTableDescriptor tableAtBeginTs = catalogTables.table(tableId, beginTs.longValue());
+ CatalogTableDescriptor tableAtOpTs = catalogTables.table(tableId, operationTimestamp.longValue());
+
+ assert tableAtBeginTs != null;
+ assert tableAtOpTs != null;
+
+ if (tableAtOpTs.tableVersion() != tableAtBeginTs.tableVersion()) {
+ throw new IncompatibleSchemaException(
+ String.format(
+ "Table schema was updated after the transaction was started [table=%d, startSchema=%d, operationSchema=%d]",
+ tableId, tableAtBeginTs.tableVersion(), tableAtOpTs.tableVersion()
+ )
+ );
+ }
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
index 6baae3a1c5..f5bc723121 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestType.java
@@ -57,5 +57,43 @@ public enum RequestType {
RO_GET_ALL,
- RO_SCAN
+ RO_SCAN;
+
+ /**
+ * Returns {@code true} if the operation is an RW read.
+ */
+ public boolean isRwRead() {
+ switch (this) {
+ case RW_GET:
+ case RW_GET_ALL:
+ case RW_SCAN:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the operation is a write.
+ */
+ public boolean isWrite() {
+ switch (this) {
+ case RW_DELETE:
+ case RW_DELETE_ALL:
+ case RW_DELETE_EXACT:
+ case RW_DELETE_EXACT_ALL:
+ case RW_INSERT:
+ case RW_INSERT_ALL:
+ case RW_UPSERT:
+ case RW_UPSERT_ALL:
+ case RW_REPLACE:
+ case RW_REPLACE_IF_EXIST:
+ case RW_GET_AND_DELETE:
+ case RW_GET_AND_REPLACE:
+ case RW_GET_AND_UPSERT:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index d63a88a6fd..2aeb6e1fd4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -26,6 +26,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,6 +42,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -194,6 +198,9 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
TestPartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE);
+ CatalogTables catalogTables = mock(CatalogTables.class);
+ when(catalogTables.table(anyInt(), anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
ClusterNode localNode = mock(ClusterNode.class);
partitionReplicaListener = new PartitionReplicaListener(
@@ -232,6 +239,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
mock(IndexBuilder.class),
mock(SchemaSyncService.class, invocation -> completedFuture(null)),
mock(CatalogService.class),
+ catalogTables,
tablesConfig,
new TestPlacementDriver(localNode.name())
);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dd69c9e333..460826eadb 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -39,13 +39,17 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.AdditionalMatchers.gt;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -77,6 +81,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -133,6 +138,7 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
@@ -147,6 +153,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
@@ -177,6 +184,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junitpioneer.jupiter.cartesian.ArgumentSets;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.mockito.ArgumentCaptor;
@@ -198,7 +206,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static final int CURRENT_SCHEMA_VERSION = 1;
- private static final int FUTURE_SCHEMA_VERSION = 2;
+ private static final int NEXT_SCHEMA_VERSION = 2;
+
+ private static final int FUTURE_SCHEMA_VERSION = NEXT_SCHEMA_VERSION;
private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0;
@@ -287,6 +297,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Mock
private CatalogService catalogService;
+ @Mock
+ private CatalogTables catalogTables;
+
/** Schema descriptor for tests. */
private SchemaDescriptor schemaDescriptor;
@@ -299,6 +312,18 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Key-value marshaller using schema version 2. */
private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2;
+ private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
+ tblId, 1, "table", 1, CURRENT_SCHEMA_VERSION,
+ List.of(
+ new CatalogTableColumnDescriptor("intKey", ColumnType.INT32, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("strKey", ColumnType.STRING, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("intVal", ColumnType.INT32, false, 0, 0, 0, null),
+ new CatalogTableColumnDescriptor("strVal", ColumnType.STRING, false, 0, 0, 0, null)
+ ),
+ List.of("intKey", "strKey"),
+ null
+ );
+
/** Partition replication listener to test. */
private PartitionReplicaListener partitionReplicaListener;
@@ -382,12 +407,14 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));
when(schemas.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(completedFuture(null));
+ lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+
int pkIndexId = 1;
int sortedIndexId = 2;
int hashIndexId = 3;
schemaDescriptor = schemaDescriptorWith(CURRENT_SCHEMA_VERSION);
- schemaDescriptorVersion2 = schemaDescriptorWith(FUTURE_SCHEMA_VERSION);
+ schemaDescriptorVersion2 = schemaDescriptorWith(NEXT_SCHEMA_VERSION);
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
@@ -459,6 +486,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
mock(IndexBuilder.class),
schemaSyncService,
catalogService,
+ catalogTables,
tablesConfig,
new TestPlacementDriver(localNode.name())
);
@@ -505,7 +533,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .txId(TestTransactionIds.newTransactionId())
+ .txId(newTxId())
.build(), "senderId");
LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
@@ -516,7 +544,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testTxStateReplicaRequestCommitState() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
txStateStorage.put(txId, new TxMeta(TxState.COMMITED, singletonList(grpId), clock.now()));
@@ -546,7 +574,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .txId(TestTransactionIds.newTransactionId())
+ .txId(newTxId())
.build(), localNode.id());
LeaderOrTxState tuple = (LeaderOrTxState) fut.get(1, TimeUnit.SECONDS);
@@ -573,7 +601,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlySingleRowReplicaRequestCommittedResult() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
@@ -596,7 +624,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentCommitted() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
@@ -619,7 +647,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentPending() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
@@ -641,7 +669,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlySingleRowReplicaRequestResolveWriteIntentAborted() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
@@ -664,7 +692,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testWriteScanRetrieveBatchReplicaRequestWithSortedIndex() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
@@ -681,7 +709,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
- UUID scanTxId = TestTransactionIds.newTransactionId();
+ UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
@@ -718,7 +746,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Request bounded.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.timestampLong(clock.nowLong())
.term(1L)
.scanId(2L)
@@ -737,7 +765,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.timestampLong(clock.nowLong())
.term(1L)
.scanId(2L)
@@ -754,7 +782,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.timestampLong(clock.nowLong())
.term(1L)
.scanId(2L)
@@ -771,7 +799,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlyScanRetrieveBatchReplicaRequestSortedIndex() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
@@ -788,7 +816,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
- UUID scanTxId = TestTransactionIds.newTransactionId();
+ UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
@@ -823,7 +851,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Request bounded.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
@@ -841,7 +869,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
@@ -857,7 +885,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(sortedIndexId)
@@ -873,7 +901,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testReadOnlyScanRetrieveBatchReplicaRequstHashIndex() throws Exception {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
int hashIndexId = hashIndexStorage.id();
IntStream.range(0, 7).forEach(i -> {
@@ -890,7 +918,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
- UUID scanTxId = TestTransactionIds.newTransactionId();
+ UUID scanTxId = newTxId();
// Request first batch
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
@@ -927,7 +955,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Empty result.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
@@ -943,7 +971,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
// Lookup.
fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(grpId)
- .transactionId(TestTransactionIds.newTransactionId())
+ .transactionId(newTxId())
.readTimestampLong(clock.nowLong())
.scanId(2L)
.indexToUse(hashIndexId)
@@ -959,7 +987,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testWriteIntentOnPrimaryReplicaInsertUpdateDelete() throws MarshallerException {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow testRow = binaryRow(0);
BinaryRow testRowPk = kvMarshaller.marshal(new TestKey(0, "k0"));
@@ -1011,7 +1039,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testWriteIntentOnPrimaryReplicaMultiRowOps() throws MarshallerException {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
BinaryRow row0 = binaryRow(0);
BinaryRow row1 = binaryRow(1);
Collection<BinaryRow> rows = asList(row0, row1);
@@ -1058,6 +1086,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType) {
+ return doSingleRowRequest(txId, binaryRow, requestType, false);
+ }
+
+ private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
@@ -1065,6 +1097,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryRowMessage(binaryRowMessage(binaryRow))
.term(1L)
.commitPartitionId(commitPartitionId())
+ .full(full)
.build(),
localNode.id()
);
@@ -1091,6 +1124,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
+ return doMultiRowRequest(txId, binaryRows, requestType, false);
+ }
+
+ private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(grpId)
.transactionId(txId)
@@ -1098,12 +1135,17 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
.term(1L)
.commitPartitionId(commitPartitionId())
+ .full(full)
.build(),
localNode.id()
);
}
private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType) {
+ return doMultiRowPkRequest(txId, binaryRows, requestType, false);
+ }
+
+ private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(grpId)
.transactionId(txId)
@@ -1111,6 +1153,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.primaryKeys(binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList()))
.term(1L)
.commitPartitionId(commitPartitionId())
+ .full(full)
.build(),
localNode.id()
);
@@ -1118,7 +1161,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
AtomicInteger counter = new AtomicInteger();
testWriteIntentOnPrimaryReplica(
@@ -1143,7 +1186,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void testWriteIntentOnPrimaryReplicaUpdateAll() {
- UUID txId = TestTransactionIds.newTransactionId();
+ UUID txId = newTxId();
AtomicInteger counter = new AtomicInteger();
testWriteIntentOnPrimaryReplica(
@@ -1278,7 +1321,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow br2Pk = kvMarshaller.marshal(new TestKey(2, "k" + 2));
if (insertFirst) {
- UUID tx0 = beginTx();
+ UUID tx0 = newTxId();
upsert(tx0, br1);
upsert(tx0, br2);
cleanup(tx0);
@@ -1286,7 +1329,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
txState = null;
- UUID tx1 = beginTx();
+ UUID tx1 = newTxId();
delete(tx1, br1Pk);
upsert(tx1, br1);
@@ -1540,7 +1583,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static Stream<Arguments> singleRowRequestTypes() {
return Arrays.stream(RequestType.values())
- .filter(RequestTypes::isSingleRow)
+ .filter(RequestTypes::isSingleRowRw)
.map(Arguments::of);
}
@@ -1592,7 +1635,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static Stream<Arguments> multiRowsRequestTypes() {
return Arrays.stream(RequestType.values())
- .filter(RequestTypes::isMultipleRows)
+ .filter(RequestTypes::isMultipleRowsRw)
.map(Arguments::of);
}
@@ -1608,6 +1651,10 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow) {
+ return doReplaceRequest(targetTxId, oldRow, newRow, false);
+ }
+
+ private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow, boolean full) {
return partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest()
.groupId(grpId)
.transactionId(targetTxId)
@@ -1616,6 +1663,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.binaryRowMessage(binaryRowMessage(newRow))
.term(1L)
.commitPartitionId(commitPartitionId())
+ .full(full)
.build(),
localNode.id()
);
@@ -1659,16 +1707,21 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
@Test
public void failsWhenFullScanReadsTupleWithIncompatibleSchemaFromFuture() {
testFailsWhenReadingFromFutureIncompatibleSchema(
- (targetTxId, key) -> partitionReplicaListener.invoke(
- TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
- .groupId(grpId)
- .transactionId(targetTxId)
- .term(1L)
- .scanId(1)
- .batchSize(100)
- .build(),
- localNode.id()
- )
+ (targetTxId, key) -> doRwFullScanRetrieveBatchRequest(targetTxId, false)
+ );
+ }
+
+ private CompletableFuture<?> doRwFullScanRetrieveBatchRequest(UUID targetTxId, boolean full) {
+ return partitionReplicaListener.invoke(
+ TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
+ .groupId(grpId)
+ .transactionId(targetTxId)
+ .term(1L)
+ .scanId(1)
+ .batchSize(100)
+ .full(full)
+ .build(),
+ localNode.id()
);
}
@@ -1698,9 +1751,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
TestKey key = nextKey();
if (RequestTypes.looksUpFirst(requestType)) {
- UUID tx0 = beginTx();
- upsert(tx0, binaryRow(key, someValue));
- cleanup(tx0);
+ upsertInNewTxFor(key);
// While handling the upsert, our mocks were touched, let's reset them to prevent false-positives during verification.
Mockito.reset(schemaSyncService);
@@ -1708,7 +1759,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
when(catalogService.activeCatalogVersion(anyLong())).thenReturn(42);
- UUID targetTxId = beginTx();
+ UUID targetTxId = newTxId();
CompletableFuture<?> future = listenerInvocation.invoke(targetTxId, key);
@@ -1730,6 +1781,12 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
assertThat(catalogVersionAware.requiredCatalogVersion(), is(42));
}
+ private void upsertInNewTxFor(TestKey key) {
+ UUID tx0 = newTxId();
+ upsert(tx0, binaryRow(key, someValue));
+ cleanup(tx0);
+ }
+
@Test
public void replaceRequestIsSuppliedWithRequiredCatalogVersion() {
testWritesAreSuppliedWithRequiredCatalogVersion(RequestType.RW_REPLACE, (targetTxId, key) -> {
@@ -1763,8 +1820,146 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.map(Arguments::of);
}
- private static UUID beginTx() {
- return TestTransactionIds.newTransactionId();
+ @CartesianTest
+ @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+ void singleRowRwOperationsFailIfTableAlteredAfterTxStart(
+ RequestType requestType,
+ boolean onExistingRow,
+ boolean full
+ ) {
+ ListenerInvocation invocation = null;
+
+ if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+ };
+ } else if (RequestTypes.isSingleRowRwFullRow(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType);
+ };
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+ }
+
+ @SuppressWarnings("unused")
+ private static ArgumentSets singleRowRwOperationTypesFactory() {
+ return ArgumentSets.argumentsForFirstParameter(singleRowRwOperationTypes())
+ .argumentsForNextParameter(false, true)
+ .argumentsForNextParameter(false, true);
+ }
+
+ private static Stream<RequestType> singleRowRwOperationTypes() {
+ return Arrays.stream(RequestType.values())
+ .filter(RequestTypes::isSingleRowRw);
+ }
+
+ private void testRwOperationsFailIfTableAlteredAfterTxStart(
+ RequestType requestType,
+ boolean onExistingRow,
+ ListenerInvocation listenerInvocation
+ ) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = newTxId();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class);
+ CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class);
+ when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+ when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+ when(catalogTables.table(tblId, txBeginTs.longValue())).thenReturn(tableVersion1);
+ when(catalogTables.table(eq(tblId), gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+ boolean expectValidationFailure;
+ if (RequestTypes.neverMisses(requestType)) {
+ expectValidationFailure = true;
+ } else {
+ expectValidationFailure = onExistingRow == RequestTypes.writesIfKeyDoesNotExist(requestType);
+ }
+
+ if (expectValidationFailure) {
+ IncompatibleSchemaException ex = assertWillThrowFast(future, IncompatibleSchemaException.class);
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+ assertThat(
+ ex.getMessage(),
+ is("Table schema was updated after the transaction was started [table=1, startSchema=1, operationSchema=2]")
+ );
+ } else {
+ assertThat(future, willCompleteSuccessfully());
+ }
+ }
+
+ @CartesianTest
+ @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory")
+ void multiRowRwOperationsFailIfTableAlteredAfterTxStart(
+ RequestType requestType, boolean onExistingRow, boolean full
+ ) {
+ ListenerInvocation invocation = null;
+
+ if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ };
+ } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full);
+ };
+ } else {
+ fail("Uncovered type: " + requestType);
+ }
+
+ testRwOperationsFailIfTableAlteredAfterTxStart(requestType, onExistingRow, invocation);
+ }
+
+ @SuppressWarnings("unused")
+ private static ArgumentSets multiRowRwOperationTypesFactory() {
+ return ArgumentSets.argumentsForFirstParameter(multiRowRwOperationTypes())
+ .argumentsForNextParameter(false, true)
+ .argumentsForNextParameter(false, true);
+ }
+
+ private static Stream<RequestType> multiRowRwOperationTypes() {
+ return Arrays.stream(RequestType.values())
+ .filter(RequestTypes::isMultipleRowsRw);
+ }
+
+ @CartesianTest
+ void replaceRequestFailsIfTableAlteredAfterTxStart(
+ @Values(booleans = {false, true}) boolean onExistingRow,
+ @Values(booleans = {false, true}) boolean full
+ ) {
+ testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_REPLACE, onExistingRow, (targetTxId, key) -> {
+ return doReplaceRequest(
+ targetTxId,
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ marshalKeyOrKeyValue(RequestType.RW_REPLACE, key),
+ full
+ );
+ });
+ }
+
+ @CartesianTest
+ void rwScanRequestFailsIfTableAlteredAfterTxStart(
+ @Values(booleans = {false, true}) boolean onExistingRow,
+ @Values(booleans = {false, true}) boolean full
+ ) {
+ testRwOperationsFailIfTableAlteredAfterTxStart(RequestType.RW_SCAN, onExistingRow, (targetTxId, key) -> {
+ return doRwFullScanRetrieveBatchRequest(targetTxId, full);
+ });
+ }
+
+ private UUID newTxId() {
+ return transactionIdFor(clock.now());
}
private void upsert(UUID txId, BinaryRow row) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java
new file mode 100644
index 0000000000..89b8767ed8
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replicator/action/RequestTypeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator.action;
+
+
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_GET_ALL;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RO_SCAN;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
+import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_SCAN;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.EnumSet;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+class RequestTypeTest {
+ @Test
+ void isRwReadWorksAsExpected() {
+ Set<RequestType> expectedRwReads = EnumSet.of(RW_GET, RW_GET_ALL, RW_SCAN);
+
+ for (RequestType requestType : RequestType.values()) {
+ if (expectedRwReads.contains(requestType)) {
+ assertTrue(requestType.isRwRead(), requestType + " must be an RW read");
+ } else {
+ assertFalse(requestType.isRwRead(), requestType + " must not be an RW read");
+ }
+ }
+ }
+
+ @Test
+ void isWriteWorksAsExpected() {
+ Set<RequestType> expectedNonWrites = EnumSet.of(RW_GET, RW_GET_ALL, RW_SCAN, RO_GET, RO_GET_ALL, RO_SCAN);
+
+ for (RequestType requestType : RequestType.values()) {
+ if (expectedNonWrites.contains(requestType)) {
+ assertFalse(requestType.isWrite(), requestType + " must not be a write");
+ } else {
+ assertTrue(requestType.isWrite(), requestType + " must be a write");
+ }
+ }
+ }
+}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 4beff4a5d4..e69a6fdf90 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -26,6 +26,9 @@ import static org.apache.ignite.utils.ClusterServiceTestUtils.waitForTopology;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -47,6 +50,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
@@ -93,6 +97,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
@@ -463,6 +468,10 @@ public class ItTxTestCluster {
raftSvc -> {
try {
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schemaDescriptor);
+
+ CatalogTables catalogTables = mock(CatalogTables.class);
+ lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(mock(CatalogTableDescriptor.class));
+
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tableId, partId),
completedFuture(null),
@@ -488,6 +497,7 @@ public class ItTxTestCluster {
mock(IndexBuilder.class),
mock(SchemaSyncService.class, invocation -> completedFuture(null)),
mock(CatalogService.class),
+ catalogTables,
tablesConfig,
placementDriver
),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
index 1697dd1a92..737639b27a 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/replicator/action/RequestTypes.java
@@ -24,13 +24,24 @@ import org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
*/
public class RequestTypes {
/**
- * Returns {@code true} if the operation works with a single row.
+ * Returns {@code true} if the operation works with a single PK and it's RW.
*/
- public static boolean isSingleRow(RequestType type) {
+ public static boolean isSingleRowRwPkOnly(RequestType type) {
switch (type) {
case RW_GET:
case RW_DELETE:
case RW_GET_AND_DELETE:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the operation works with a single row (full row) and it's RW.
+ */
+ public static boolean isSingleRowRwFullRow(RequestType type) {
+ switch (type) {
case RW_DELETE_EXACT:
case RW_INSERT:
case RW_UPSERT:
@@ -43,20 +54,38 @@ public class RequestTypes {
}
}
+ /**
+ * Returns {@code true} if the operation works with a single row and it's RW.
+ */
+ public static boolean isSingleRowRw(RequestType type) {
+ return isSingleRowRwPkOnly(type) || isSingleRowRwFullRow(type);
+ }
+
/**
* Returns {@code true} if the operation works with a single row and it's a write.
*/
public static boolean isSingleRowWrite(RequestType type) {
- return isSingleRow(type) && type != RequestType.RW_GET;
+ return isSingleRowRw(type) && type.isWrite();
}
/**
- * Returns {@code true} if the operation works with multiple rows.
+ * Returns {@code true} if the operation works with multiple PKs and it's RW.
*/
- public static boolean isMultipleRows(RequestType type) {
+ public static boolean isMultipleRowsRwPkOnly(RequestType type) {
switch (type) {
case RW_GET_ALL:
case RW_DELETE_ALL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the operation works with multiple rows (full rows) and it's RW.
+ */
+ public static boolean isMultipleRowsRwFullRows(RequestType type) {
+ switch (type) {
case RW_DELETE_EXACT_ALL:
case RW_INSERT_ALL:
case RW_UPSERT_ALL:
@@ -67,10 +96,17 @@ public class RequestTypes {
}
/**
- * Returns {@code true} if the operation works with multiple rows.
+ * Returns {@code true} if the operation works with multiple rows and it's RW.
+ */
+ public static boolean isMultipleRowsRw(RequestType type) {
+ return isMultipleRowsRwPkOnly(type) || isMultipleRowsRwFullRows(type);
+ }
+
+ /**
+ * Returns {@code true} if the operation works with multiple rows and it is a write.
*/
public static boolean isMultipleRowsWrite(RequestType type) {
- return isMultipleRows(type) && type != RequestType.RW_GET_ALL;
+ return isMultipleRowsRw(type) && type.isWrite();
}
/**
@@ -101,4 +137,32 @@ public class RequestTypes {
return true;
}
}
+
+ /**
+ * Returns {@code true} if the operation only makes a write if the corresponding key does not have a value yet in the table.
+ */
+ public static boolean writesIfKeyDoesNotExist(RequestType type) {
+ switch (type) {
+ case RW_INSERT:
+ case RW_INSERT_ALL:
+ return false;
+ default:
+ return true;
+ }
+ }
+
+ /**
+ * Returns {@code true} if the operation always reads or writes something, regardless of whether there is something under the key
+ * in the table or not.
+ */
+ public static boolean neverMisses(RequestType type) {
+ switch (type) {
+ case RW_UPSERT:
+ case RW_UPSERT_ALL:
+ case RW_GET_AND_UPSERT:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3a4fe1692b..a777af6887 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.table.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -42,6 +44,7 @@ import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.configuration.NamedListConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -85,6 +88,7 @@ import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replicator.CatalogTables;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
@@ -342,6 +346,12 @@ public class DummyInternalTableImpl extends InternalTableImpl {
DummySchemaManagerImpl schemaManager = new DummySchemaManagerImpl(schema);
+ CatalogTables catalogTables = mock(CatalogTables.class);
+ CatalogTableDescriptor tableDescriptor = mock(CatalogTableDescriptor.class);
+
+ lenient().when(catalogTables.table(anyInt(), anyLong())).thenReturn(tableDescriptor);
+ lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
+
TablesConfiguration tablesConfig = mock(TablesConfiguration.class);
NamedConfigurationTree<TableConfiguration, TableView, TableChange> tablesTree = mock(NamedListConfiguration.class);
NamedListView<TableView> tablesList = mock(NamedListView.class);
@@ -380,6 +390,7 @@ public class DummyInternalTableImpl extends InternalTableImpl {
mock(IndexBuilder.class),
mock(SchemaSyncService.class, invocation -> completedFuture(null)),
mock(CatalogService.class),
+ catalogTables,
tablesConfig,
new TestPlacementDriver(LOCAL_NODE.name())
);