You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "sashapolo (via GitHub)" <gi...@apache.org> on 2023/05/11 11:37:02 UTC

[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1190873028


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   This class is redundant, you should use `CompletableFutureExceptionMatcher` instead



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/AbortDueToIncompatibleSchemaException.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * Thrown when, during an attempt to commit a transaction, it turns out that the transaction cannot be committed
+ * because an incompatible schema change has happened.
+ */
+public class AbortDueToIncompatibleSchemaException extends TransactionException {

Review Comment:
   I would propose to rename this exception to `IncompatibleSchemaAbortException`, it's a mouthful otherwise



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {

Review Comment:
   There's an exactly the same method in `RebalanceUtil`, I would suggest to move it to a more common place, like `CollectionUtils`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;

Review Comment:
   Should we also throw an exception here if this is a successful result?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {

Review Comment:
   I would propose to rename this method to `isSuccessful` or `isSuccess`, just to be consistent



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   I don't understand this logic. Should throwing this exception affect the `finishTransaction` method so that the state of the transaction changes to `ABORTED`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   I think this intermediate variable is redundant, you can use the stream directly inside `thenApply` below



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {

Review Comment:
   There exists an exact copy of this method in `ClientTableCommon`, I would suggest to extract it into a common place



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {

Review Comment:
   `colName` parameter is redundant, `column` already contains a name



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ColumnDefinitionDiff.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Captures a difference between 'old' and 'new' versions of the same column definition.

Review Comment:
   Should the fields be called `old` and `new` instead of `prev` and `next`?



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration DataStorageConfiguration dsCfg) {
 
         lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   I know that it's not related to this PR, but all these `lenient()` calls can be removed using the `MockitoSettings` annotation



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {
+        Set<String> result = new HashSet<>(minuend);
+        result.removeAll(subtrahend);
+        return result;
+    }
+
+    private static Set<String> intersect(Set<String> set1, Set<String> set2) {

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.

Review Comment:
   ```suggestion
        * Computes a diff between this and a previous schema.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   API of the `schemaCompatValidator` is a little bit counter-intuitive to me: we should only validate a request if it's a commit request. I understand that we do this check *inside* `validateForwards` but I still think it would be more clear to do that outside this mehod



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));

Review Comment:
   Maybe it would be a little bit cleaner to extract this code into a method like `groupByName`, because it's used 4 times in this method.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   This is strange, `thisColumnsByName` is already a map from column name to the column you are looking for. Can we do `addedColumnNames.stream().map(thisColumnsByName::get)` instead?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))

Review Comment:
   same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   You should use `thenAccept` here instead of `thenCompose`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;
+    }
+
+    /**
+     * Returns version number of the schema to which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema to which an incompatible transition tried to be made.
+     */
+    public int toSchemaVersion() {
+        return toSchemaVersion;

Review Comment:
   And here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {
+        switch (spec) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native type: " + spec);

Review Comment:
   Wait a second, you indeed copy-pasted it from `ClientTableCommon`, shame on you!



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {

Review Comment:
   Should be called `columnDescriptor`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {

Review Comment:
   This method seems redundant to me



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   Are sure this is correct? There are multiple types of default values



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return NonHistoricSchemas.getColumnType(type.spec());

Review Comment:
   Why do you reuse this method from `NonHistoricSchemas` and not the method that creates a `TableColumnDescriptor` from a `Column`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   Also, this way you don't have to collect `addedColumnNames` into a separate set and avoid some copying, you can filter the `thisColumnsByName.keySet()` in-place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org