You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/21 12:42:24 UTC

[ignite-3] 01/01: Support column renaming.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-14951
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 9f7e0e8075a40e6a5efde2fcfd02512533d43dcd
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Mon Jun 21 15:42:09 2021 +0300

    Support column renaming.
---
 .../schemas/table/ColumnConfigurationSchema.java   |   2 -
 .../ignite/internal/manager/EventListener.java     |   4 +-
 .../internal/runner/app/SchemaChangeTest.java      | 100 ++++++++++++++++++++-
 .../ColumnMapping.java => ColumnMapper.java}       |  52 ++++++-----
 .../ignite/internal/schema/ColumnMapping.java}     |  35 +++-----
 .../ignite/internal/schema/SchemaDescriptor.java   |  21 ++++-
 .../ignite/internal/schema/SchemaManager.java      |  85 ++++++++++++++++--
 .../SchemaConfigurationConverter.java              |  10 ---
 .../schema/registry/SchemaRegistryImpl.java        |  42 +--------
 .../schema/registry/UpgradingRowAdapter.java       |   4 +-
 .../SchemaConfigurationConverterTest.java          |   2 +-
 .../internal/table/distributed/TableManager.java   |  30 +++++--
 12 files changed, 266 insertions(+), 121 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
index 192090d..ee38059 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
@@ -20,7 +20,6 @@ package org.apache.ignite.configuration.schemas.table;
 import org.apache.ignite.configuration.annotation.Config;
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
 
 /**
  * Configuration for single column in SQL table.
@@ -29,7 +28,6 @@ import org.apache.ignite.configuration.validation.Immutable;
 public class ColumnConfigurationSchema {
     /** Column name. */
     @Value
-    @Immutable
     public String name;
 
     /** Column type. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
index b0cb9e3..04e684c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
@@ -31,8 +31,8 @@ public interface EventListener<P extends EventParameters> {
      *
      * @param parameters Parameters provide a properties of the event. This attribute cannot be {@code null}.
      * @param exception Exception which is happened during the event produced or {@code null}.
-     * @return True means that the event is handled and a listener will be removed,
-     * false is the listener will stay listen.
+     * @return {@code True} means that the event is handled and a listener will be removed,
+     * {@code false} is the listener will stay listen.
      */
     boolean notify(@NotNull P parameters, @Nullable Throwable exception);
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
index 08b5b10..dc620eb 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
@@ -126,7 +126,10 @@ class SchemaChangeTest {
                         });
             }));
 
-        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build()));
+        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+            tbl1.tupleBuilder().set("key", 2L).build(),
+            tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build())
+        );
 
         tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
         kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
@@ -195,7 +198,10 @@ class SchemaChangeTest {
         tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
         kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
 
-        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build()));
+        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+            tbl1.tupleBuilder().set("key", 2L).build(),
+            tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build())
+        );
 
         clusterNodes.get(1).tables().alterTable(schTbl1.canonicalName(),
             chng -> chng.changeColumns(cols -> {
@@ -241,4 +247,94 @@ class SchemaChangeTest {
         assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val1"));
         assertEquals("default", kvView2.get(keyTuple2).value("val2"));
     }
+
+    /**
+     * Check rename column from table schema.
+     */
+    @Test
+    void renameValueColumn() {
+        List<Ignite> clusterNodes = new ArrayList<>();
+
+        for (Map.Entry<String, String> nodeBootstrapCfg : nodesBootstrapCfg.entrySet())
+            clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(), nodeBootstrapCfg.getValue()));
+
+        assertEquals(3, clusterNodes.size());
+
+        // Create table on node 0.
+        SchemaTable schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
+            SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+            SchemaBuilders.column("val1", ColumnType.INT32).asNullable().build()
+        ).withPrimaryKey("key").build();
+
+        clusterNodes.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
+            convert(schTbl1, tblCh)
+                .changeReplicas(1)
+                .changePartitions(10)
+        );
+
+        Table tbl1 = clusterNodes.get(1).tables().table(schTbl1.canonicalName());
+        Table tbl2 = clusterNodes.get(2).tables().table(schTbl1.canonicalName());
+
+        // Put data on node 1.
+        KeyValueBinaryView kvView1 = tbl1.kvView();
+
+        tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
+        kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
+
+        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+            tbl1.tupleBuilder().set("key", 2L).build(),
+            tbl1.tupleBuilder().set("val2", -222).build())
+        );
+
+        clusterNodes.get(1).tables().alterTable(schTbl1.canonicalName(),
+            tblChanger -> tblChanger.changeColumns(cols -> {
+                final String colKey = tblChanger.columns().namedListKeys().stream()
+                    .filter(c -> "val1".equals(tblChanger.columns().get(c).name()))
+                    .findFirst()
+                    .orElseThrow(() -> {
+                        throw new IllegalStateException("Column not found.");
+                    });
+
+                tblChanger.changeColumns(listChanger ->
+                    listChanger.update(colKey, colChanger -> colChanger.changeName("val2"))
+                );
+            }));
+
+        assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+            tbl1.tupleBuilder().set("key", 3L).build(),
+            tbl1.tupleBuilder().set("val1", -333).build())
+        );
+
+        kvView1.put(tbl1.tupleBuilder().set("key", 3L).build(), tbl1.tupleBuilder().set("val2", 333).build());
+
+        // Get data on node 2.
+        KeyValueBinaryView kvView2 = tbl2.kvView();
+
+        final Tuple keyTuple3 = tbl2.tupleBuilder().set("key", 3L).build();
+
+        assertEquals(3, (Long)tbl2.get(keyTuple3).value("key"));
+        assertEquals(333, (Integer)tbl2.get(keyTuple3).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple3).value("val1"));
+
+        assertEquals(333, (Integer)kvView2.get(keyTuple3).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple3).value("val1"));
+
+        // Check old row conversion.
+        final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", 1L).build();
+        final Tuple keyTuple2 = kvView2.tupleBuilder().set("key", 2L).build();
+
+        assertEquals(1, (Long)tbl2.get(keyTuple1).value("key"));
+        assertEquals(111, (Integer)tbl2.get(keyTuple1).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple1).value("val1"));
+
+        assertEquals(111, (Integer)kvView2.get(keyTuple1).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple1).value("val1"));
+
+        assertEquals(2, (Long)tbl2.get(keyTuple2).value("key"));
+        assertEquals(222, (Integer)tbl2.get(keyTuple2).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple2).value("val1"));
+
+        assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val2"));
+        assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple2).value("val1"));
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
similarity index 53%
rename from modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
rename to modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
index dbb95a0..eff00ff 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
@@ -15,50 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.schema.registry;
-
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+package org.apache.ignite.internal.schema;
 
 /**
- * Column mapping.
+ * Column mapper implementation.
  */
-class ColumnMapping {
+class ColumnMapper implements ColumnMapping {
+    /** Identity mapper. */
+    private static final IdentityMapper IDENTITY_MAPPER = new IdentityMapper();
+
+    /**
+     * @return Identity mapper instance.
+     */
+    static ColumnMapping identityMapping() {
+        return IDENTITY_MAPPER;
+    }
+
     /** Mapping. */
     private final int[] mapping;
 
-    /** First mapped column index. */
-    private final int firstColIdx;
-
     /**
      * @param schema Source schema descriptor.
      */
-    ColumnMapping(SchemaDescriptor schema) {
-        firstColIdx = schema.keyColumns().length();
-        mapping = new int[schema.valueColumns().length()];
+    public ColumnMapper(SchemaDescriptor schema) {
+        mapping = new int[schema.length()];
     }
 
     /**
      * Add column mapping.
      *
-     * @param from Column index in source schema.
-     * @param to Column index in schema.
+     * @param from Source column index.
+     * @param to Target column index.
      */
-    void add(int from, int to) {
-        assert from >= firstColIdx && from < firstColIdx + mapping.length;
+    public void add(int from, int to) {
+        mapping[from] = to;
+    }
 
-        mapping[from - firstColIdx] = to;
+    /** {@inheritDoc} */
+    @Override public int map(int idx) {
+        return mapping[idx];
     }
 
     /**
-     * Gets mapped column idx.
-     *
-     * @param idx Column index in source.
-     * @return Column index in targer schema.
+     * Identity column mapper.
      */
-    int map(int idx) {
-        if (idx < firstColIdx)
+    private static class IdentityMapper implements ColumnMapping {
+        /** {@inheritDoc} */
+        @Override public int map(int idx) {
             return idx;
-
-        return mapping[idx - firstColIdx];
+        }
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
similarity index 53%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
copy to modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
index 192090d..43b0f83 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
@@ -15,32 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.configuration.schemas.table;
+package org.apache.ignite.internal.schema;
 
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.ConfigValue;
-import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
+import java.io.Serializable;
 
 /**
- * Configuration for single column in SQL table.
+ * Column mapping.
  */
-@Config
-public class ColumnConfigurationSchema {
-    /** Column name. */
-    @Value
-    @Immutable
-    public String name;
-
-    /** Column type. */
-    @ConfigValue
-    public ColumnTypeConfigurationSchema type;
-
-    /** Nullable flag. */
-    @Value
-    public boolean nullable;
-
-    /** Default value. */
-    @Value(hasDefault = true)
-    public String defaultValue = "";
+public interface ColumnMapping extends Serializable {
+    /**
+     * Map column idx in source schema to column idx in target schema.
+     *
+     * @param idx Column index in source schema.
+     * @return Column index in target schema or {@code -1} if no column exists in target schema.
+     */
+    int map(int idx);
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index 0b14363..4a4c8d1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
  * Full schema descriptor containing key columns chunk, value columns chunk, and schema version.
  */
 public class SchemaDescriptor implements Serializable {
-    /** Table identifier.*/
+    /** Table identifier. */
     private final UUID tableId;
 
     /** Schema version. Incremented on each schema modification. */
@@ -50,6 +50,9 @@ public class SchemaDescriptor implements Serializable {
     /** Mapping 'Column name' -&gt; Column. */
     private final Map<String, Column> colMap;
 
+    /** Column mapper. */
+    private ColumnMapping colMapper = ColumnMapper.identityMapping();
+
     /**
      * @param tableId Table id.
      * @param ver Schema version.
@@ -162,6 +165,22 @@ public class SchemaDescriptor implements Serializable {
         return colMap.get(name);
     }
 
+    /**
+     * Sets column mapper for previous schema version.
+     *
+     * @param colMapper Column mapper.
+     */
+    public void columnMapper(ColumnMapping colMapper) {
+        this.colMapper = colMapper;
+    }
+
+    /**
+     * @return Column mapper.
+     */
+    public ColumnMapping columnMapper() {
+        return colMapper;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SchemaDescriptor.class, this);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 8d7f872..94d82aa 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.schema;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -125,7 +128,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
                             onEvent(SchemaEvent.DROPPED, new SchemaEventParameters(tblId, null), null);
                         }
                         else {
-                            int v = (int)ByteUtils.bytesToLong(evt.newEntry().value(),0);
+                            int v = (int)ByteUtils.bytesToLong(evt.newEntry().value());
 
                             assert reg.lastSchemaVersion() == v;
 
@@ -184,7 +187,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
                 final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId);
                 final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + schemaVer);
 
-                SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig);
+                SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig.value());
                 final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, schemaVer, schemaTable);
 
                 return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
@@ -203,24 +206,33 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
      *
      * @param tblId Table id.
      * @param tblName Table name.
+     * @param oldTbl Old table configuration.
+     * @param newTbl New table configuraiton.
      * @return Operation future.
      */
-    public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, String tblName) {
+    public CompletableFuture<Boolean> updateSchemaForTable(
+        final UUID tblId,
+        String tblName,
+        TableView oldTbl,
+        TableView newTbl
+    ) {
         return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
             thenCompose(entry -> {
-                TableConfiguration tblConfig = configurationMgr.configurationRegistry().
-                    getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
-
                 assert !entry.empty();
 
-                final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 0);
+                final int oldVer = (int)ByteUtils.bytesToLong(entry.value());
                 final int newVer = oldVer + 1;
 
                 final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId);
                 final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + newVer);
 
-                SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig);
-                final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, schemaTable);
+                final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, SchemaConfigurationConverter.convert(newTbl));
+
+                desc.columnMapper(columnMapper(
+                    schemaRegistryForTable(tblId).schema(oldVer),
+                    oldTbl,
+                    desc,
+                    newTbl));
 
                 return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
                     Operations.put(schemaKey, ByteUtils.toBytes(desc)),
@@ -234,6 +246,61 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
     }
 
     /**
+     * @param oldDesc Old schema descriptor.
+     * @param oldTbl Old table configuration.
+     * @param newDesc New schema descriptor.
+     * @param newTbl New table configuration.
+     * @return Column mapper.
+     */
+    private ColumnMapping columnMapper(
+        SchemaDescriptor oldDesc,
+        TableView oldTbl,
+        SchemaDescriptor newDesc,
+        TableView newTbl) {
+        ColumnMapper mapper = null;
+
+        for (String s : newTbl.columns().namedListKeys()) {
+            final ColumnView newColView = newTbl.columns().get(s);
+            final ColumnView oldColView = oldTbl.columns().get(s);
+
+            if (oldColView == null) {
+                assert !newDesc.isKeyColumn(newDesc.column(newColView.name()).schemaIndex());
+
+                if (mapper == null)
+                    mapper = new ColumnMapper(newDesc);
+
+                mapper.add(newDesc.column(newColView.name()).schemaIndex(), -1); // New column added.
+            }
+            else {
+                final Column newCol = newDesc.column(newColView.name());
+                final Column oldCol = oldDesc.column(oldColView.name());
+
+                if (!newCol.type().equals(oldCol.type()) || newCol.nullable() != oldCol.nullable())
+                    throw new InvalidTypeException("Column of incompatible type: [schemaVer=" + newDesc.version() + ", col=" + newCol);
+
+                if (newCol.name().equals(newCol.name()))
+                    continue; // Changing column 'default' doesn't change mapping.
+
+                if (mapper == null)
+                    mapper = new ColumnMapper(newDesc);
+
+                mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
+            }
+        }
+
+        final Optional<Column> droppedKeyCol = oldTbl.columns().namedListKeys().stream()
+            .filter(k -> newTbl.columns().get(k) == null)
+            .map(k -> oldDesc.column(oldTbl.columns().get(k).name()))
+            .filter(c -> oldDesc.isKeyColumn(c.schemaIndex()))
+            .findAny();
+
+        if (droppedKeyCol.isPresent())
+            throw new SchemaRegistryException("Dropping of key column is forbidden: [schemaVer=" + newDesc.version() + ", col=" + droppedKeyCol.get());
+
+        return mapper == null ? ColumnMapper.identityMapping() : mapper;
+    }
+
+    /**
      * Return table schema of certain version from history.
      *
      * @param tblId Table id.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
index b943f83..9b41a07 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
@@ -398,16 +398,6 @@ public class SchemaConfigurationConverter {
     }
 
     /**
-     * Convert TableConfiguration to SchemaTable.
-     *
-     * @param tblCfg TableConfiguration to convert.
-     * @return SchemaTable.
-     */
-    public static SchemaTable convert(TableConfiguration tblCfg) {
-        return convert(tblCfg.value());
-    }
-
-    /**
      * Convert configuration to SchemaTable.
      *
      * @param tblView TableView to convert.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index a2fac90..b934462 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,13 +17,9 @@
 
 package org.apache.ignite.internal.schema.registry;
 
-import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.Row;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -110,42 +106,12 @@ public class SchemaRegistryImpl implements SchemaRegistry {
         if (curSchema.version() == rowSchema.version())
             return new Row(rowSchema, row);
 
-        return new UpgradingRowAdapter(curSchema, row, columnMapper(curSchema, rowSchema));
-    }
-
-    /**
-     * Create column mapping for schemas.
-     *
-     * @param src Source schema of newer version.
-     * @param dst Target schema of older version.
-     * @return Column mapping.
-     */
-    private ColumnMapping columnMapper(SchemaDescriptor src, SchemaDescriptor dst) {
-        assert src.version() > dst.version();
-        assert src.version() == dst.version() + 1; // TODO: IGNITE-14863 implement merged mapper for arbitraty schema versions.
-
-        final Columns srcCols = src.valueColumns();
-        final Columns dstCols = dst.valueColumns();
+        assert curSchema.version() >= rowSchema.version();
 
-        final ColumnMapping mapping = new ColumnMapping(src);
-
-        for (int i = 0; i < srcCols.columns().length; i++) {
-            final Column col = srcCols.column(i);
-
-            try {
-                final int idx = dstCols.columnIndex(col.name());
-
-                if (!col.equals(dstCols.column(idx)))
-                    throw new InvalidTypeException("Column of incompatible type: [colIdx=" + col.schemaIndex() + ", schemaVer=" + src.version());
-
-                mapping.add(col.schemaIndex(), dst.keyColumns().length() + idx);
-            }
-            catch (NoSuchElementException ex) {
-                mapping.add(col.schemaIndex(), -1);
-            }
-        }
+        // TODO: IGNITE-14864: implement merged mapper for arbitraty schema versions.
+        assert curSchema.version() == rowSchema.version() + 1 : "Mapper merging is not supported yet.";
 
-        return mapping;
+        return new UpgradingRowAdapter(curSchema, row, curSchema.columnMapper());
     }
 
     /**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index 8c9a4fc..3fb0fd3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.schema.registry;
 
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ColumnMapping;
 import org.apache.ignite.internal.schema.InvalidTypeException;
 import org.apache.ignite.internal.schema.NativeTypeSpec;
 import org.apache.ignite.internal.schema.Row;
@@ -43,9 +44,6 @@ class UpgradingRowAdapter extends Row {
 
     /** {@inheritDoc} */
     @Override protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException {
-        if (schema.isKeyColumn(colIdx))
-            return super.findColumn(colIdx, type);
-
         int mapIdx = mapping.map(colIdx);
 
         return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
index 1690b5a..6b524cd 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
@@ -193,7 +193,7 @@ public class SchemaConfigurationConverterTest {
         TableConfiguration tblCfg = confRegistry.getConfiguration(TablesConfiguration.KEY).tables()
             .get(tbl.canonicalName());
 
-        SchemaTable tbl2 = SchemaConfigurationConverter.convert(tblCfg);
+        SchemaTable tbl2 = SchemaConfigurationConverter.convert(tblCfg.value());
 
         assertEquals(tbl.canonicalName(), tbl2.canonicalName());
         assertEquals(tbl.indices().size(), tbl2.indices().size());
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1937789..95dd2d5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -34,6 +35,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
 import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
 import org.apache.ignite.configuration.schemas.table.TableChange;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
@@ -220,8 +223,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                             assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
 
-                            return newTbl.columns().namedListKeys().stream().anyMatch(c -> !oldTbl.columns().namedListKeys().contains(c)) ||
-                                oldTbl.columns().namedListKeys().stream().anyMatch(c -> !newTbl.columns().namedListKeys().contains(c));
+                            if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
+                                return true;
+
+                            return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
+                                final ColumnView newCol = newTbl.columns().get(k);
+                                final ColumnView oldCol = oldTbl.columns().get(k);
+
+                                assert oldCol != null;
+
+                                assert Objects.equals(newCol.type(), oldCol.type()) : "Columns type change is not supported.";
+                                assert Objects.equals(newCol.nullable(), oldCol.nullable()) : "Column nullability change is not supported";
+
+                                return !Objects.equals(newCol.name(), oldCol.name()) ||
+                                    !Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
+                            });
                         }).collect(Collectors.toSet()) :
                     Collections.emptySet();
 
@@ -229,7 +245,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 futs.addAll(startTables(tablesToStart, ctx.storageRevision(), ctx.newValue()));
 
             if (!schemaChanged.isEmpty())
-                futs.addAll(changeSchema(schemaChanged));
+                futs.addAll(changeSchema(ctx, schemaChanged));
 
             if (!tablesToStop.isEmpty())
                 futs.addAll(stopTables(tablesToStop));
@@ -376,10 +392,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /**
      * Start tables routine.
      *
+     * @param ctx Configuration change context.
      * @param tbls Tables to start.
      * @return Table creation futures.
      */
-    private List<CompletableFuture<Boolean>> changeSchema(Set<String> tbls) {
+    private List<CompletableFuture<Boolean>> changeSchema(
+        ConfigurationNotificationEvent<NamedListView<TableView>> ctx,
+        Set<String> tbls
+    ) {
         boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr);
 
         List<CompletableFuture<Boolean>> futs = new ArrayList<>();
@@ -392,7 +412,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             final int ver = tbl.schemaView().lastSchemaVersion() + 1;
 
             if (hasMetastorageLocally)
-                futs.add(schemaMgr.updateSchemaForTable(tblId, tblName));
+                futs.add(schemaMgr.updateSchemaForTable(tblId, tblName, ctx.oldValue().get(tblName), ctx.newValue().get(tblName)));
 
             final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();