You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "rpuch (via GitHub)" <gi...@apache.org> on 2023/05/10 14:47:33 UTC

[GitHub] [ignite-3] rpuch opened a new pull request, #2055: IGNITE-19228 Schema validation during tx processing: common framework

rpuch opened a new pull request, #2055:
URL: https://github.com/apache/ignite-3/pull/2055

   https://issues.apache.org/jira/browse/IGNITE-19228


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192235317


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191189480


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   I disagree. This class allows to inspect the exception that was thrown (analogously to `assertThrows()`) in the test code that follows the assertion instead of writing a matcher, which is more natural and easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191990740


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -125,34 +117,24 @@ public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
             }
         }
 
-        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
-        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> prevIndexesByName = toMapByName(prevSchema.indexes, IndexDescriptor::name);
+        Map<String, IndexDescriptor> thisIndexesByName = toMapByName(this.indexes, IndexDescriptor::name);
 
-        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
-        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
-
-        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
-                .filter(col -> addedIndexNames.contains(col.name()))
-                .collect(toList());
-        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
-                .filter(col -> removedIndexNames.contains(col.name()))
-                .collect(toList());
+        List<IndexDescriptor> addedIndexes = subtractKeyed(thisIndexesByName, prevIndexesByName);
+        List<IndexDescriptor> removedIndexes = subtractKeyed(prevIndexesByName, thisIndexesByName);
 
         return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
     }
 
-    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {
-        Set<String> result = new HashSet<>(minuend);
-        result.removeAll(subtrahend);
-        return result;
+    private static <T> Map<String, T> toMapByName(List<T> elements, Function<T, String> nameExtractor) {
+        return elements.stream().collect(toMap(nameExtractor, identity()));
     }
 
-    private static Set<String> intersect(Set<String> set1, Set<String> set2) {
-        Set<String> result = new HashSet<>(set1);
-        result.retainAll(set2);
-        return result;
+    private static <T> List<T> subtractKeyed(Map<String, T> diminuend, Map<String, T> subtrahend) {
+        return diminuend.keySet().stream()

Review Comment:
   You can use `entrySet` here to avoid calling `get` afterwards



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/NativeTypeSpec.java:
##########
@@ -355,6 +359,66 @@ public static Class<?> toClass(NativeTypeSpec spec, boolean nullable) {
         }
     }
 
+    /**
+     * Gets client type corresponding to this server type.
+     *
+     * @return Client type code.
+     */
+    public ColumnType asColumnType() {
+        switch (this) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native type: " + this);

Review Comment:
   I don't think using `PROTOCOL_ERR` is correct here, because it's an error type from the Thin Client



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -125,34 +117,24 @@ public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
             }
         }
 
-        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
-        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
-                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> prevIndexesByName = toMapByName(prevSchema.indexes, IndexDescriptor::name);
+        Map<String, IndexDescriptor> thisIndexesByName = toMapByName(this.indexes, IndexDescriptor::name);
 
-        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
-        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
-
-        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
-                .filter(col -> addedIndexNames.contains(col.name()))
-                .collect(toList());
-        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
-                .filter(col -> removedIndexNames.contains(col.name()))
-                .collect(toList());
+        List<IndexDescriptor> addedIndexes = subtractKeyed(thisIndexesByName, prevIndexesByName);
+        List<IndexDescriptor> removedIndexes = subtractKeyed(prevIndexesByName, thisIndexesByName);
 
         return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
     }
 
-    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {
-        Set<String> result = new HashSet<>(minuend);
-        result.removeAll(subtrahend);
-        return result;
+    private static <T> Map<String, T> toMapByName(List<T> elements, Function<T, String> nameExtractor) {
+        return elements.stream().collect(toMap(nameExtractor, identity()));
     }
 
-    private static Set<String> intersect(Set<String> set1, Set<String> set2) {
-        Set<String> result = new HashSet<>(set1);
-        result.retainAll(set2);
-        return result;
+    private static <T> List<T> subtractKeyed(Map<String, T> diminuend, Map<String, T> subtrahend) {

Review Comment:
   `diminuend`? What does that mean?



##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -468,7 +468,7 @@ public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager met
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {

Review Comment:
   Can we get rid of this method?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java:
##########
@@ -347,12 +347,12 @@ private void doOnNewPeersConfigurationApplied(PeersAndLearners configuration) {
             // For further addition
             Set<Assignment> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
             calculatedSwitchAppend = subtract(calculatedSwitchAppend, addedNodes);
-            calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);
+            calculatedSwitchAppend = CollectionUtils.intersect(calculatedAssignments, calculatedSwitchAppend);

Review Comment:
   I propose to add a static import here, looks strange otherwise



##########
modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java:
##########
@@ -596,4 +597,15 @@ public static <T> Set<T> difference(@Nullable Set<T> a, @Nullable Set<T> b) {
     public static IntSet setOf(Collection<Integer> coll) {
         return IntSets.unmodifiable(new IntOpenHashSet(coll));
     }
+
+    /**
+     * Returns an intersection of two sets.
+     *
+     * @param op1 First operand.
+     * @param op2 Second operand.
+     * @return Result of the intersection.
+     */
+    public static <T> Set<T> intersect(Set<T> op1, Set<T> op2) {

Review Comment:
   No subtrahend =(



##########
modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java:
##########
@@ -359,4 +361,9 @@ private void testSetOf(Collection<Integer> data) {
         assertThrows(UnsupportedOperationException.class, () -> copy.add(42));
         assertThrows(UnsupportedOperationException.class, () -> copy.remove(3));
     }
+
+    @Test
+    void testIntersect() {

Review Comment:
   Let's add a test for en empty intersection as well



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   I think it looks ok if extracted into a method:
   ```
   ForwardValidationResult validateTablesSchemas(
           List<TablePartitionId> enlistedGroupIds,
           HybridTimestamp beginTimestamp,
           HybridTimestamp commitTimestamp
   ) {
       return enlistedGroupIds.stream()
               .map(TablePartitionId::tableId)
               .distinct()
               .map(tableId -> validateSchemaCompatibility(beginTimestamp, commitTimestamp, tableId))
               .filter(validationResult -> !validationResult.isSuccessful())
               .findAny()
               .orElse(ForwardValidationResult.success());
   }
   ```



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration DataStorageConfiguration dsCfg) {
 
         lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   ok



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   oops, I missed the `stillCommit` part



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   > But the result is that finishAndCleanup() is called twice in different branches, and this seems weird.
   
   I don't know but it looks perfectly fine to me:
   
   ```
   if (request.commit()) {
       return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commitTimestamp())
               .thenCompose(validationResult -> {
                   return finishAndCleanup(request, validationResult.isSuccessful(), aggregatedGroupIds, txId)
                           .thenCompose(result -> failureIfValidationFailed(validationResult));
               });
   } else {
       return finishAndCleanup(request, false, aggregatedGroupIds, txId);
   }
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   Or, if you still like your `for` cycle, I think it's better to use a `Set` instead of `distinct().toList()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191987555


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   Because looks like there is also a functional default value type, will it work in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191309331


##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration DataStorageConfiguration dsCfg) {
 
         lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   I'm not sure it's a good thing to just make all stubbings lenient. I would prefer to be explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192246761


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   ok, my suggestion was a little bit too eager and does not cover your case, I agree. I just don't like this logic duplication between two classes, but ok, let's keep this class(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192086808


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   A matcher gives a binary answer (matches/does not match) and a description of what does not match. This is a pretty rigid format if you want to make a few assertions. On the other hand, if we just add assertions (throwing an AssertionError) 'inside' a matcher, this looks like an abuse of the idea of a matcher.
   
   Each tool is good for its situation. Sometimes a matcher is the right tool, sometimes direct inspection is better. It seems correct to have both tools and use them accordingly. Moreover, this does not bloat the code too much, the class we are discussing is rather small and simple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191189480


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   I disagree. This class allows to inspect the exception that was thrown (analogously to `assertThrows()`) in the test code that follows the assertion instead of writing a matcher, which is easier and often (like in the place where this assertion is used in this PR) more natural.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192243160


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   Comparing the two options, I still prefer the `for` loop as the one with the stream makes you think a bit harder to understand it. But I like the ideas of using `Set` and extracting the method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191189480


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   I disagree. This class allows to inspect the exception that was thrown (analogously to `assertThrows()`) in the test code that follows instead of writing a matcher, which is more natural and easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192257742


##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -468,7 +468,7 @@ public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager met
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {

Review Comment:
   You are right, it's not a big deal. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192243160


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   Comparing the two options, I still prefer the `for` loop as the one with the stream makes you think a bit harder to understand it. But I like the ideas of using `Set` and extracting the method, implemented them both, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191986278


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   Can we add a `Matcher` and an overload that allows us to inspect the exception? This way we can write something like `assertThat(f, willThrow(e -> ...))`. I really want to have a single way to work with such assertions...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192100097


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   This is a temporary code that will be thrown away when we finish the switch. The column definition that is produced is actually 'fake', it's not used anywhere because this class is only used for schema validation, and for now it always returns just one schema, so no actual column validation happens.
   
   Also, it's impossible to directly convert `Column`'s functional default value provider (which is a `DefaultValueGenerator`) to `TableColumnDefinition`'s functional default value (which is a string - just a function name).
   
   I amended the comment to reflect that the conversion is approximate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191986772


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191265498


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   I return a future to signal an exception, so `thenAccept()` does not fit. Do you suggest throwing the exception?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   I return a future to signal an exception, so `thenAccept()` does not fit. Do you suggest to throw the exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192102623


##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -468,7 +468,7 @@ public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager met
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {

Review Comment:
   Not in this PR probably



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192235499


##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -468,7 +468,7 @@ public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager met
      * @return Result of the subtraction.
      */
     public static <T> Set<T> subtract(Set<T> minuend, Set<T> subtrahend) {

Review Comment:
   why? Is it used a lot?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191986278


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   Can we add a `Matcher` and an overload that allows us to inspect the exception? This way we can write something like `assertThat(f, willThrow(e -> ...)`



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   Can we add a `Matcher` and an overload that allows us to inspect the exception? This way we can write something like `assertThat(f, willThrow(e -> ...))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191327035


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {
+        switch (spec) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native type: " + spec);

Review Comment:
   I'm sorry :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191273421


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   The idea is that, if validation fails, we change the intention from commit to abort, so finishTransaction() will actually abort the transaction, and only then will we signal the exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill merged pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "tkalkirill (via GitHub)" <gi...@apache.org>.
tkalkirill merged PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1190873028


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/asserts/CompletableFutureAssert.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.asserts;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Assertions related to {@link CompletableFuture}.
+ */
+public class CompletableFutureAssert {

Review Comment:
   This class is redundant, you should use `CompletableFutureExceptionMatcher` instead



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/AbortDueToIncompatibleSchemaException.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import org.apache.ignite.lang.ErrorGroups.Transactions;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * Thrown when, during an attempt to commit a transaction, it turns out that the transaction cannot be committed
+ * because an incompatible schema change has happened.
+ */
+public class AbortDueToIncompatibleSchemaException extends TransactionException {

Review Comment:
   I would propose to rename this exception to `IncompatibleSchemaAbortException`, it's a mouthful otherwise



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {

Review Comment:
   There's an exactly the same method in `RebalanceUtil`, I would suggest to move it to a more common place, like `CollectionUtils`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;

Review Comment:
   Should we also throw an exception here if this is a successful result?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {

Review Comment:
   I would propose to rename this method to `isSuccessful` or `isSuccess`, just to be consistent



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {
+                                if (validationResult.isOk()) {
+                                    return completedFuture(result);
+                                } else {
+                                    return failedFuture(new AbortDueToIncompatibleSchemaException("Commit failed because schema "

Review Comment:
   I don't understand this logic. Should throwing this exception affect the `finishTransaction` method so that the state of the transaction changes to `ABORTED`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   I think this intermediate variable is redundant, you can use the stream directly inside `thenApply` below



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {

Review Comment:
   There exists an exact copy of this method in `ClientTableCommon`, I would suggest to extract it into a common place



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {

Review Comment:
   `colName` parameter is redundant, `column` already contains a name



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/ColumnDefinitionDiff.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Captures a difference between 'old' and 'new' versions of the same column definition.

Review Comment:
   Should the fields be called `old` and `new` instead of `prev` and `next`?



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -290,6 +314,8 @@ public void beforeTest(@InjectConfiguration DataStorageConfiguration dsCfg) {
 
         lenient().when(safeTimeClock.waitFor(any())).thenReturn(completedFuture(null));
 
+        lenient().when(schemas.waitForSchemasAvailability(any())).thenReturn(completedFuture(null));

Review Comment:
   I know that it's not related to this PR, but all these `lenient()` calls can be removed using the `MockitoSettings` annotation



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))
+                .collect(toList());
+        List<IndexDescriptor> removedIndexes = prevIndexesByName.values().stream()
+                .filter(col -> removedIndexNames.contains(col.name()))
+                .collect(toList());
+
+        return new TableDefinitionDiff(addedColumns, removedColumns, changedColumns, addedIndexes, removedIndexes);
+    }
+
+    private static Set<String> subtract(Set<String> minuend, Set<String> subtrahend) {
+        Set<String> result = new HashSet<>(minuend);
+        result.removeAll(subtrahend);
+        return result;
+    }
+
+    private static Set<String> intersect(Set<String> set1, Set<String> set2) {

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.

Review Comment:
   ```suggestion
        * Computes a diff between this and a previous schema.
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))
+                .collect(toList());
+
+        Set<String> intersectionColumnNames = intersect(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        List<ColumnDefinitionDiff> changedColumns = new ArrayList<>();
+        for (String commonColumnName : intersectionColumnNames) {
+            TableColumnDescriptor prevColumn = prevColumnsByName.get(commonColumnName);
+            TableColumnDescriptor thisColumn = thisColumnsByName.get(commonColumnName);
+
+            if (columnChanged(prevColumn, thisColumn)) {
+                changedColumns.add(new ColumnDefinitionDiff(prevColumn, thisColumn));
+            }
+        }
+
+        Map<String, IndexDescriptor> prevIndexesByName = prevSchema.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+        Map<String, IndexDescriptor> thisIndexesByName = this.indexes.stream()
+                .collect(toMap(IndexDescriptor::name, identity()));
+
+        Set<String> addedIndexNames = subtract(thisIndexesByName.keySet(), prevIndexesByName.keySet());
+        Set<String> removedIndexNames = subtract(prevIndexesByName.keySet(), thisIndexesByName.keySet());
+
+        List<IndexDescriptor> addedIndexes = thisIndexesByName.values().stream()
+                .filter(col -> addedIndexNames.contains(col.name()))

Review Comment:
   Same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   API of the `schemaCompatValidator` is a little bit counter-intuitive to me: we should only validate a request if it's a commit request. I understand that we do this check *inside* `validateForwards` but I still think it would be more clear to do that outside this mehod



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));

Review Comment:
   Maybe it would be a little bit cleaner to extract this code into a method like `groupByName`, because it's used 4 times in this method.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   This is strange, `thisColumnsByName` is already a map from column name to the column you are looking for. Can we do `addedColumnNames.stream().map(thisColumnsByName::get)` instead?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))
+                .collect(toList());
+        List<TableColumnDescriptor> removedColumns = prevColumnsByName.values().stream()
+                .filter(col -> removedColumnNames.contains(col.name()))

Review Comment:
   same here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())
+                .thenCompose(validationResult -> {
+                    boolean stillCommit = request.commit() && validationResult.isOk();
 
-        CompletableFuture<Object> changeStateFuture = finishTransaction(aggregatedGroupIds, txId, commit);
+                    return finishAndCleanup(request, stillCommit, aggregatedGroupIds, txId)
+                            .thenCompose(result -> {

Review Comment:
   You should use `thenAccept` here instead of `thenCompose`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/ForwardValidationResult.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of forward schema compatibility validation.
+ */
+public class ForwardValidationResult {
+    private static final ForwardValidationResult SUCCESS = new ForwardValidationResult(true, null, -1, -1);
+
+    private final boolean ok;
+    @Nullable
+    private final UUID failedTableId;
+    private final int fromSchemaVersion;
+    private final int toSchemaVersion;
+
+    /**
+     * Returns a successful validation result.
+     *
+     * @return A successful validation result.
+     */
+    public static ForwardValidationResult success() {
+        return SUCCESS;
+    }
+
+    /**
+     * Creates a validation result denoting validation failure.
+     *
+     * @param failedTableId Table which schema change is incompatible.
+     * @param fromSchemaVersion Version number of the schema from which an incompatible transition tried to be made.
+     * @param toSchemaVersion Version number of the schema to which an incompatible transition tried to be made.
+     * @return A validation result for a failure.
+     */
+    public static ForwardValidationResult failure(UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        return new ForwardValidationResult(false, failedTableId, fromSchemaVersion, toSchemaVersion);
+    }
+
+    private ForwardValidationResult(boolean ok, @Nullable UUID failedTableId, int fromSchemaVersion, int toSchemaVersion) {
+        this.ok = ok;
+        this.failedTableId = failedTableId;
+        this.fromSchemaVersion = fromSchemaVersion;
+        this.toSchemaVersion = toSchemaVersion;
+    }
+
+    /**
+     * Returns whether the validation was successful.
+     *
+     * @return Whether the validation was successful
+     */
+    public boolean isOk() {
+        return ok;
+    }
+
+    /**
+     * Returns ID of the table for which the validation has failed. Should only be called for a failed validation result,
+     * otherwise an exception is thrown.
+     *
+     * @return Table ID.
+     */
+    public UUID failedTableId() {
+        return Objects.requireNonNull(failedTableId);
+    }
+
+    /**
+     * Returns version number of the schema from which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema from which an incompatible transition tried to be made.
+     */
+    public int fromSchemaVersion() {
+        return fromSchemaVersion;
+    }
+
+    /**
+     * Returns version number of the schema to which an incompatible transition tried to be made.
+     *
+     * @return Version number of the schema to which an incompatible transition tried to be made.
+     */
+    public int toSchemaVersion() {
+        return toSchemaVersion;

Review Comment:
   And here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()

Review Comment:
   Please extract this into a variable



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return getColumnType(type.spec());
+    }
+
+    /**
+     * Detects {@link ColumnType} by {@link NativeTypeSpec}.
+     *
+     * @param spec Native type spec.
+     * @return Detected {@link ColumnType}.
+     */
+    public static ColumnType getColumnType(NativeTypeSpec spec) {
+        switch (spec) {
+            case INT8:
+                return ColumnType.INT8;
+
+            case INT16:
+                return ColumnType.INT16;
+
+            case INT32:
+                return ColumnType.INT32;
+
+            case INT64:
+                return ColumnType.INT64;
+
+            case FLOAT:
+                return ColumnType.FLOAT;
+
+            case DOUBLE:
+                return ColumnType.DOUBLE;
+
+            case DECIMAL:
+                return ColumnType.DECIMAL;
+
+            case NUMBER:
+                return ColumnType.NUMBER;
+
+            case UUID:
+                return ColumnType.UUID;
+
+            case STRING:
+                return ColumnType.STRING;
+
+            case BYTES:
+                return ColumnType.BYTE_ARRAY;
+
+            case BITMASK:
+                return ColumnType.BITMASK;
+
+            case DATE:
+                return ColumnType.DATE;
+
+            case TIME:
+                return ColumnType.TIME;
+
+            case DATETIME:
+                return ColumnType.DATETIME;
+
+            case TIMESTAMP:
+                return ColumnType.TIMESTAMP;
+
+            default:
+                throw new IgniteException(PROTOCOL_ERR, "Unsupported native type: " + spec);

Review Comment:
   Wait a second, you indeed copy-pasted it from `ClientTableCommon`, shame on you!



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {

Review Comment:
   Should be called `columnDescriptor`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {

Review Comment:
   This method seems redundant to me



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   Are sure this is correct? There are multiple types of default values



##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummySchemas.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * Dummy {@link Schemas} implementation that is not historic and always uses same {@link SchemaRegistry}.
+ */
+public class DummySchemas implements Schemas {
+    private final SchemaRegistry schemaRegistry;
+
+    public DummySchemas(SchemaRegistry schemaRegistry) {
+        this.schemaRegistry = schemaRegistry;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())
+        );
+    }
+
+    private static ColumnType columnTypeFromNativeType(NativeType type) {
+        return NonHistoricSchemas.getColumnType(type.spec());

Review Comment:
   Why do you reuse this method from `NonHistoricSchemas` and not the method that creates a `TableColumnDescriptor` from a `Column`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/FullTableSchema.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+
+/**
+ * Represents a full table schema: that is, the definition of the table and all objects (indexes, constraints, etc)
+ * that belong to the table.
+ */
+public class FullTableSchema {
+    private final int schemaVersion;
+    private final int tableId;
+
+    private final List<TableColumnDescriptor> columns;
+
+    private final List<IndexDescriptor> indexes;
+
+    /**
+     * Constructor.
+     */
+    public FullTableSchema(
+            int schemaVersion,
+            int tableId,
+            List<TableColumnDescriptor> columns,
+            List<IndexDescriptor> indexes
+    ) {
+        this.schemaVersion = schemaVersion;
+        this.tableId = tableId;
+        this.columns = columns;
+        this.indexes = indexes;
+    }
+
+    /**
+     * Returns version of the table definition.
+     *
+     * @return Version of the table definition.
+     */
+    public int schemaVersion() {
+        return schemaVersion;
+    }
+
+    /**
+     * Returns ID of the table.
+     *
+     * @return ID of the table
+     */
+    public int tableId() {
+        return tableId;
+    }
+
+    /**
+     * Returns definitions of the columns of the table.
+     *
+     * @return Definitions of the columns of the table.
+     */
+    public List<TableColumnDescriptor> columns() {
+        return columns;
+    }
+
+    /**
+     * Returns definitions of indexes belonging to the table.
+     *
+     * @return Definitions of indexes belonging to the table.
+     */
+    public List<IndexDescriptor> indexes() {
+        return indexes;
+    }
+
+    /**
+     * Computes a diff of this schema from a previous schema.
+     *
+     * @param prevSchema Previous table schema.
+     * @return Difference between the schemas.
+     */
+    public TableDefinitionDiff diffFrom(FullTableSchema prevSchema) {
+        Map<String, TableColumnDescriptor> prevColumnsByName = prevSchema.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+        Map<String, TableColumnDescriptor> thisColumnsByName = this.columns.stream()
+                .collect(toMap(TableColumnDescriptor::name, identity()));
+
+        Set<String> addedColumnNames = subtract(thisColumnsByName.keySet(), prevColumnsByName.keySet());
+        Set<String> removedColumnNames = subtract(prevColumnsByName.keySet(), thisColumnsByName.keySet());
+
+        List<TableColumnDescriptor> addedColumns = thisColumnsByName.values().stream()
+                .filter(col -> addedColumnNames.contains(col.name()))

Review Comment:
   Also, this way you don't have to collect `addedColumnNames` into a separate set and avoid some copying, you can filter the `thisColumnsByName.keySet()` in-place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1192247489


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191277181


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
+import org.apache.ignite.internal.table.distributed.schema.Schemas;
+import org.apache.ignite.internal.table.distributed.schema.TableDefinitionDiff;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Validates schema compatibility.
+ */
+class SchemaCompatValidator {
+    private final Schemas schemas;
+
+    SchemaCompatValidator(Schemas schemas) {
+        this.schemas = schemas;
+    }
+
+    /**
+     * Performs forward compatibility validation (if needed). That is, for each table enlisted in the transaction,
+     * checks to see whether the initial schema (identified by the begin timestamp) is forward-compatible with the
+     * commit schema (identified by the commit timestamp).
+     *
+     * <p>If doing an abort (and not commit), the validation always succeeds.
+     *
+     * @param txId ID of the transaction that gets validated.
+     * @param enlistedGroupIds IDs of the partitions that are enlisted with the transaction.
+     * @param commit Whether it's a commit attempt (otherwise it's an abort).
+     * @param commitTimestamp Commit timestamp (or {@code null} if it's an abort).
+     * @return Future completed with validation result.
+     */
+    CompletableFuture<ForwardValidationResult> validateForwards(
+            UUID txId,
+            List<TablePartitionId> enlistedGroupIds,
+            boolean commit,
+            @Nullable HybridTimestamp commitTimestamp
+    ) {
+        if (!commit) {
+            return completedFuture(ForwardValidationResult.success());
+        }
+
+        HybridTimestamp beginTimestamp = TransactionIds.beginTimestamp(txId);
+
+        List<UUID> tableIds = enlistedGroupIds.stream()

Review Comment:
   It looks too cumbersome inside `for`. Introducing a variable allows to easier read the `for` loop, imho.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191261731


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -976,13 +983,34 @@ private CompletableFuture<Void> processTxFinishAction(TxFinishReplicaRequest req
         List<TablePartitionId> aggregatedGroupIds = request.groups().values().stream()
                 .flatMap(List::stream)
                 .map(IgniteBiTuple::get1)
-                .collect(Collectors.toList());
+                .collect(toList());
 
         UUID txId = request.txId();
 
-        boolean commit = request.commit();
+        return schemaCompatValidator.validateForwards(txId, aggregatedGroupIds, request.commit(), request.commitTimestamp())

Review Comment:
   I see two alternatives to the current approach.
   
   One is to branch on `request.commit()` in the `PartitionReplicaListener` and, if it's a commit, do what is done there now (validate, then invoke `finishAndCleanup()`, then maybe throw); if it's an abort, just call `finishAndCleanup()`. But the result is that `finishAndCleanup()` is called twice in different branches, and this seems weird.
   
   Another alternative is to build a method like `validateForwardsIfCommit()`, but it would still produce a future of `ForwardValidationResult`, so we would have to return `success()` if it's an abort. Still looks weird: we kinda did not validate, but we returned a validation result. We could also add a third state to the validation result, like 'no need for validation', but it would work exactly like 'success', so this also looks weird.
   
   What changes do you envision here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] rpuch commented on a diff in pull request #2055: IGNITE-19228 Schema validation during tx processing: common framework

Posted by "rpuch (via GitHub)" <gi...@apache.org>.
rpuch commented on code in PR #2055:
URL: https://github.com/apache/ignite-3/pull/2055#discussion_r1191307293


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/NonHistoricSchemas.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.lang.ErrorGroups.Client.PROTOCOL_ERR;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.NativeTypeSpec;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
+
+/**
+ * A dummy implementation over {@link SchemaManager}. It is dummy because:
+ *
+ * <ul>
+ *     <li>It imitates historicity, but always takes the latest known schema</li>
+ *     <li>{@link #tableSchemaVersionsBetween(UUID, HybridTimestamp, HybridTimestamp)} always returns a single schema to avoid
+ *     validation failures</li>
+ * </ul>
+ *
+ * <p>The point of this implementation is to allow the system work in the pre-SchemaSync fashion before the switch to CatalogService
+ * is possible.
+ */
+// TODO: IGNITE-19447 - remove when switched to the CatalogService
+public class NonHistoricSchemas implements Schemas {
+    private final SchemaManager schemaManager;
+
+    public NonHistoricSchemas(SchemaManager schemaManager) {
+        this.schemaManager = schemaManager;
+    }
+
+    @Override
+    public CompletableFuture<?> waitForSchemasAvailability(HybridTimestamp ts) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public List<FullTableSchema> tableSchemaVersionsBetween(UUID tableId, HybridTimestamp fromIncluding, HybridTimestamp toIncluding) {
+        SchemaRegistry schemaRegistry = schemaManager.schemaRegistry(tableId);
+
+        SchemaDescriptor schemaDescriptor = schemaRegistry.schema();
+        FullTableSchema fullSchema = new FullTableSchema(
+                1,
+                1,
+                schemaDescriptor.columnNames().stream()
+                        .map(colName -> {
+                            Column column = schemaDescriptor.column(colName);
+
+                            assert column != null;
+
+                            return columnType(colName, column);
+                        })
+                        .collect(toList()),
+                List.of()
+        );
+
+        return List.of(fullSchema);
+    }
+
+    private static TableColumnDescriptor columnType(String colName, Column column) {
+        return new TableColumnDescriptor(
+                colName,
+                columnTypeFromNativeType(column.type()),
+                column.nullable(),
+                DefaultValue.constant(column.defaultValue())

Review Comment:
   Looking at `UnmappedFieldBinding`, it seems correct. What makes you suspicious?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org