You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/06/21 12:42:24 UTC
[ignite-3] 01/01: Support column renaming.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-14951
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 9f7e0e8075a40e6a5efde2fcfd02512533d43dcd
Author: Andrew Mashenkov <an...@gmail.com>
AuthorDate: Mon Jun 21 15:42:09 2021 +0300
Support column renaming.
---
.../schemas/table/ColumnConfigurationSchema.java | 2 -
.../ignite/internal/manager/EventListener.java | 4 +-
.../internal/runner/app/SchemaChangeTest.java | 100 ++++++++++++++++++++-
.../ColumnMapping.java => ColumnMapper.java} | 52 ++++++-----
.../ignite/internal/schema/ColumnMapping.java} | 35 +++-----
.../ignite/internal/schema/SchemaDescriptor.java | 21 ++++-
.../ignite/internal/schema/SchemaManager.java | 85 ++++++++++++++++--
.../SchemaConfigurationConverter.java | 10 ---
.../schema/registry/SchemaRegistryImpl.java | 42 +--------
.../schema/registry/UpgradingRowAdapter.java | 4 +-
.../SchemaConfigurationConverterTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 30 +++++--
12 files changed, 266 insertions(+), 121 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
index 192090d..ee38059 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
@@ -20,7 +20,6 @@ package org.apache.ignite.configuration.schemas.table;
import org.apache.ignite.configuration.annotation.Config;
import org.apache.ignite.configuration.annotation.ConfigValue;
import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
/**
* Configuration for single column in SQL table.
@@ -29,7 +28,6 @@ import org.apache.ignite.configuration.validation.Immutable;
public class ColumnConfigurationSchema {
/** Column name. */
@Value
- @Immutable
public String name;
/** Column type. */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
index b0cb9e3..04e684c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
@@ -31,8 +31,8 @@ public interface EventListener<P extends EventParameters> {
*
* @param parameters Parameters provide a properties of the event. This attribute cannot be {@code null}.
* @param exception Exception which is happened during the event produced or {@code null}.
- * @return True means that the event is handled and a listener will be removed,
- * false is the listener will stay listen.
+ * @return {@code True} means that the event is handled and a listener will be removed,
+ * {@code false} is the listener will stay listen.
*/
boolean notify(@NotNull P parameters, @Nullable Throwable exception);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
index 08b5b10..dc620eb 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/SchemaChangeTest.java
@@ -126,7 +126,10 @@ class SchemaChangeTest {
});
}));
- assertThrows(ColumnNotFoundException.class, () -> kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build()));
+ assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+ tbl1.tupleBuilder().set("key", 2L).build(),
+ tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build())
+ );
tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
@@ -195,7 +198,10 @@ class SchemaChangeTest {
tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
- assertThrows(ColumnNotFoundException.class, () -> kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build()));
+ assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+ tbl1.tupleBuilder().set("key", 2L).build(),
+ tbl1.tupleBuilder().set("val1", 222).set("val2", "str").build())
+ );
clusterNodes.get(1).tables().alterTable(schTbl1.canonicalName(),
chng -> chng.changeColumns(cols -> {
@@ -241,4 +247,94 @@ class SchemaChangeTest {
assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val1"));
assertEquals("default", kvView2.get(keyTuple2).value("val2"));
}
+
+ /**
+ * Check rename column from table schema.
+ */
+ @Test
+ void renameValueColumn() {
+ List<Ignite> clusterNodes = new ArrayList<>();
+
+ for (Map.Entry<String, String> nodeBootstrapCfg : nodesBootstrapCfg.entrySet())
+ clusterNodes.add(IgnitionManager.start(nodeBootstrapCfg.getKey(), nodeBootstrapCfg.getValue()));
+
+ assertEquals(3, clusterNodes.size());
+
+ // Create table on node 0.
+ SchemaTable schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val1", ColumnType.INT32).asNullable().build()
+ ).withPrimaryKey("key").build();
+
+ clusterNodes.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
+ convert(schTbl1, tblCh)
+ .changeReplicas(1)
+ .changePartitions(10)
+ );
+
+ Table tbl1 = clusterNodes.get(1).tables().table(schTbl1.canonicalName());
+ Table tbl2 = clusterNodes.get(2).tables().table(schTbl1.canonicalName());
+
+ // Put data on node 1.
+ KeyValueBinaryView kvView1 = tbl1.kvView();
+
+ tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val1", 111).build());
+ kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val1", 222).build());
+
+ assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+ tbl1.tupleBuilder().set("key", 2L).build(),
+ tbl1.tupleBuilder().set("val2", -222).build())
+ );
+
+ clusterNodes.get(1).tables().alterTable(schTbl1.canonicalName(),
+ tblChanger -> tblChanger.changeColumns(cols -> {
+ final String colKey = tblChanger.columns().namedListKeys().stream()
+ .filter(c -> "val1".equals(tblChanger.columns().get(c).name()))
+ .findFirst()
+ .orElseThrow(() -> {
+ throw new IllegalStateException("Column not found.");
+ });
+
+ tblChanger.changeColumns(listChanger ->
+ listChanger.update(colKey, colChanger -> colChanger.changeName("val2"))
+ );
+ }));
+
+ assertThrows(ColumnNotFoundException.class, () -> kvView1.put(
+ tbl1.tupleBuilder().set("key", 3L).build(),
+ tbl1.tupleBuilder().set("val1", -333).build())
+ );
+
+ kvView1.put(tbl1.tupleBuilder().set("key", 3L).build(), tbl1.tupleBuilder().set("val2", 333).build());
+
+ // Get data on node 2.
+ KeyValueBinaryView kvView2 = tbl2.kvView();
+
+ final Tuple keyTuple3 = tbl2.tupleBuilder().set("key", 3L).build();
+
+ assertEquals(3, (Long)tbl2.get(keyTuple3).value("key"));
+ assertEquals(333, (Integer)tbl2.get(keyTuple3).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple3).value("val1"));
+
+ assertEquals(333, (Integer)kvView2.get(keyTuple3).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple3).value("val1"));
+
+ // Check old row conversion.
+ final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", 1L).build();
+ final Tuple keyTuple2 = kvView2.tupleBuilder().set("key", 2L).build();
+
+ assertEquals(1, (Long)tbl2.get(keyTuple1).value("key"));
+ assertEquals(111, (Integer)tbl2.get(keyTuple1).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple1).value("val1"));
+
+ assertEquals(111, (Integer)kvView2.get(keyTuple1).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple1).value("val1"));
+
+ assertEquals(2, (Long)tbl2.get(keyTuple2).value("key"));
+ assertEquals(222, (Integer)tbl2.get(keyTuple2).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> tbl2.get(keyTuple2).value("val1"));
+
+ assertEquals(222, (Integer)kvView2.get(keyTuple2).value("val2"));
+ assertThrows(ColumnNotFoundException.class, () -> kvView2.get(keyTuple2).value("val1"));
+ }
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
similarity index 53%
rename from modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
rename to modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
index dbb95a0..eff00ff 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/ColumnMapping.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapper.java
@@ -15,50 +15,54 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.schema.registry;
-
-import org.apache.ignite.internal.schema.SchemaDescriptor;
+package org.apache.ignite.internal.schema;
/**
- * Column mapping.
+ * Column mapper implementation.
*/
-class ColumnMapping {
+class ColumnMapper implements ColumnMapping {
+ /** Identity mapper. */
+ private static final IdentityMapper IDENTITY_MAPPER = new IdentityMapper();
+
+ /**
+ * @return Identity mapper instance.
+ */
+ static ColumnMapping identityMapping() {
+ return IDENTITY_MAPPER;
+ }
+
/** Mapping. */
private final int[] mapping;
- /** First mapped column index. */
- private final int firstColIdx;
-
/**
* @param schema Source schema descriptor.
*/
- ColumnMapping(SchemaDescriptor schema) {
- firstColIdx = schema.keyColumns().length();
- mapping = new int[schema.valueColumns().length()];
+ public ColumnMapper(SchemaDescriptor schema) {
+ mapping = new int[schema.length()];
}
/**
* Add column mapping.
*
- * @param from Column index in source schema.
- * @param to Column index in schema.
+ * @param from Source column index.
+ * @param to Target column index.
*/
- void add(int from, int to) {
- assert from >= firstColIdx && from < firstColIdx + mapping.length;
+ public void add(int from, int to) {
+ mapping[from] = to;
+ }
- mapping[from - firstColIdx] = to;
+ /** {@inheritDoc} */
+ @Override public int map(int idx) {
+ return mapping[idx];
}
/**
- * Gets mapped column idx.
- *
- * @param idx Column index in source.
- * @return Column index in targer schema.
+ * Identity column mapper.
*/
- int map(int idx) {
- if (idx < firstColIdx)
+ private static class IdentityMapper implements ColumnMapping {
+ /** {@inheritDoc} */
+ @Override public int map(int idx) {
return idx;
-
- return mapping[idx - firstColIdx];
+ }
}
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
similarity index 53%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
copy to modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
index 192090d..43b0f83 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/ColumnConfigurationSchema.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ColumnMapping.java
@@ -15,32 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.schemas.table;
+package org.apache.ignite.internal.schema;
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.ConfigValue;
-import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
+import java.io.Serializable;
/**
- * Configuration for single column in SQL table.
+ * Column mapping.
*/
-@Config
-public class ColumnConfigurationSchema {
- /** Column name. */
- @Value
- @Immutable
- public String name;
-
- /** Column type. */
- @ConfigValue
- public ColumnTypeConfigurationSchema type;
-
- /** Nullable flag. */
- @Value
- public boolean nullable;
-
- /** Default value. */
- @Value(hasDefault = true)
- public String defaultValue = "";
+public interface ColumnMapping extends Serializable {
+ /**
+ * Map column idx in source schema to column idx in target schema.
+ *
+ * @param idx Column index in source schema.
+ * @return Column index in target schema or {@code -1} if no column exists in target schema.
+ */
+ int map(int idx);
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index 0b14363..4a4c8d1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -32,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
* Full schema descriptor containing key columns chunk, value columns chunk, and schema version.
*/
public class SchemaDescriptor implements Serializable {
- /** Table identifier.*/
+ /** Table identifier. */
private final UUID tableId;
/** Schema version. Incremented on each schema modification. */
@@ -50,6 +50,9 @@ public class SchemaDescriptor implements Serializable {
/** Mapping 'Column name' -> Column. */
private final Map<String, Column> colMap;
+ /** Column mapper. */
+ private ColumnMapping colMapper = ColumnMapper.identityMapping();
+
/**
* @param tableId Table id.
* @param ver Schema version.
@@ -162,6 +165,22 @@ public class SchemaDescriptor implements Serializable {
return colMap.get(name);
}
+ /**
+ * Sets column mapper for previous schema version.
+ *
+ * @param colMapper Column mapper.
+ */
+ public void columnMapper(ColumnMapping colMapper) {
+ this.colMapper = colMapper;
+ }
+
+ /**
+ * @return Column mapper.
+ */
+ public ColumnMapping columnMapper() {
+ return colMapper;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaDescriptor.class, this);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index 8d7f872..94d82aa 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.schema;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -125,7 +128,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
onEvent(SchemaEvent.DROPPED, new SchemaEventParameters(tblId, null), null);
}
else {
- int v = (int)ByteUtils.bytesToLong(evt.newEntry().value(),0);
+ int v = (int)ByteUtils.bytesToLong(evt.newEntry().value());
assert reg.lastSchemaVersion() == v;
@@ -184,7 +187,7 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId);
final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + schemaVer);
- SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig);
+ SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig.value());
final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, schemaVer, schemaTable);
return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
@@ -203,24 +206,33 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
*
* @param tblId Table id.
* @param tblName Table name.
+ * @param oldTbl Old table configuration.
+ * @param newTbl New table configuraiton.
* @return Operation future.
*/
- public CompletableFuture<Boolean> updateSchemaForTable(final UUID tblId, String tblName) {
+ public CompletableFuture<Boolean> updateSchemaForTable(
+ final UUID tblId,
+ String tblName,
+ TableView oldTbl,
+ TableView newTbl
+ ) {
return vaultMgr.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
thenCompose(entry -> {
- TableConfiguration tblConfig = configurationMgr.configurationRegistry().
- getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
-
assert !entry.empty();
- final int oldVer = (int)ByteUtils.bytesToLong(entry.value(), 0);
+ final int oldVer = (int)ByteUtils.bytesToLong(entry.value());
final int newVer = oldVer + 1;
final ByteArray lastVerKey = new ByteArray(INTERNAL_PREFIX + tblId);
final ByteArray schemaKey = new ByteArray(INTERNAL_PREFIX + tblId + INTERNAL_VER_SUFFIX + newVer);
- SchemaTable schemaTable = SchemaConfigurationConverter.convert(tblConfig);
- final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, schemaTable);
+ final SchemaDescriptor desc = SchemaDescriptorConverter.convert(tblId, newVer, SchemaConfigurationConverter.convert(newTbl));
+
+ desc.columnMapper(columnMapper(
+ schemaRegistryForTable(tblId).schema(oldVer),
+ oldTbl,
+ desc,
+ newTbl));
return metaStorageMgr.invoke(Conditions.notExists(schemaKey),
Operations.put(schemaKey, ByteUtils.toBytes(desc)),
@@ -234,6 +246,61 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
}
/**
+ * @param oldDesc Old schema descriptor.
+ * @param oldTbl Old table configuration.
+ * @param newDesc New schema descriptor.
+ * @param newTbl New table configuration.
+ * @return Column mapper.
+ */
+ private ColumnMapping columnMapper(
+ SchemaDescriptor oldDesc,
+ TableView oldTbl,
+ SchemaDescriptor newDesc,
+ TableView newTbl) {
+ ColumnMapper mapper = null;
+
+ for (String s : newTbl.columns().namedListKeys()) {
+ final ColumnView newColView = newTbl.columns().get(s);
+ final ColumnView oldColView = oldTbl.columns().get(s);
+
+ if (oldColView == null) {
+ assert !newDesc.isKeyColumn(newDesc.column(newColView.name()).schemaIndex());
+
+ if (mapper == null)
+ mapper = new ColumnMapper(newDesc);
+
+ mapper.add(newDesc.column(newColView.name()).schemaIndex(), -1); // New column added.
+ }
+ else {
+ final Column newCol = newDesc.column(newColView.name());
+ final Column oldCol = oldDesc.column(oldColView.name());
+
+ if (!newCol.type().equals(oldCol.type()) || newCol.nullable() != oldCol.nullable())
+ throw new InvalidTypeException("Column of incompatible type: [schemaVer=" + newDesc.version() + ", col=" + newCol);
+
+ if (newCol.name().equals(newCol.name()))
+ continue; // Changing column 'default' doesn't change mapping.
+
+ if (mapper == null)
+ mapper = new ColumnMapper(newDesc);
+
+ mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
+ }
+ }
+
+ final Optional<Column> droppedKeyCol = oldTbl.columns().namedListKeys().stream()
+ .filter(k -> newTbl.columns().get(k) == null)
+ .map(k -> oldDesc.column(oldTbl.columns().get(k).name()))
+ .filter(c -> oldDesc.isKeyColumn(c.schemaIndex()))
+ .findAny();
+
+ if (droppedKeyCol.isPresent())
+ throw new SchemaRegistryException("Dropping of key column is forbidden: [schemaVer=" + newDesc.version() + ", col=" + droppedKeyCol.get());
+
+ return mapper == null ? ColumnMapper.identityMapping() : mapper;
+ }
+
+ /**
* Return table schema of certain version from history.
*
* @param tblId Table id.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
index b943f83..9b41a07 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverter.java
@@ -398,16 +398,6 @@ public class SchemaConfigurationConverter {
}
/**
- * Convert TableConfiguration to SchemaTable.
- *
- * @param tblCfg TableConfiguration to convert.
- * @return SchemaTable.
- */
- public static SchemaTable convert(TableConfiguration tblCfg) {
- return convert(tblCfg.value());
- }
-
- /**
* Convert configuration to SchemaTable.
*
* @param tblView TableView to convert.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index a2fac90..b934462 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -17,13 +17,9 @@
package org.apache.ignite.internal.schema.registry;
-import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Columns;
-import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -110,42 +106,12 @@ public class SchemaRegistryImpl implements SchemaRegistry {
if (curSchema.version() == rowSchema.version())
return new Row(rowSchema, row);
- return new UpgradingRowAdapter(curSchema, row, columnMapper(curSchema, rowSchema));
- }
-
- /**
- * Create column mapping for schemas.
- *
- * @param src Source schema of newer version.
- * @param dst Target schema of older version.
- * @return Column mapping.
- */
- private ColumnMapping columnMapper(SchemaDescriptor src, SchemaDescriptor dst) {
- assert src.version() > dst.version();
- assert src.version() == dst.version() + 1; // TODO: IGNITE-14863 implement merged mapper for arbitraty schema versions.
-
- final Columns srcCols = src.valueColumns();
- final Columns dstCols = dst.valueColumns();
+ assert curSchema.version() >= rowSchema.version();
- final ColumnMapping mapping = new ColumnMapping(src);
-
- for (int i = 0; i < srcCols.columns().length; i++) {
- final Column col = srcCols.column(i);
-
- try {
- final int idx = dstCols.columnIndex(col.name());
-
- if (!col.equals(dstCols.column(idx)))
- throw new InvalidTypeException("Column of incompatible type: [colIdx=" + col.schemaIndex() + ", schemaVer=" + src.version());
-
- mapping.add(col.schemaIndex(), dst.keyColumns().length() + idx);
- }
- catch (NoSuchElementException ex) {
- mapping.add(col.schemaIndex(), -1);
- }
- }
+ // TODO: IGNITE-14864: implement merged mapper for arbitraty schema versions.
+ assert curSchema.version() == rowSchema.version() + 1 : "Mapper merging is not supported yet.";
- return mapping;
+ return new UpgradingRowAdapter(curSchema, row, curSchema.columnMapper());
}
/**
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index 8c9a4fc..3fb0fd3 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.schema.registry;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ColumnMapping;
import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.Row;
@@ -43,9 +44,6 @@ class UpgradingRowAdapter extends Row {
/** {@inheritDoc} */
@Override protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException {
- if (schema.isKeyColumn(colIdx))
- return super.findColumn(colIdx, type);
-
int mapIdx = mapping.map(colIdx);
return (mapIdx < 0) ? Long.MIN_VALUE : super.findColumn(mapIdx, type);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
index 1690b5a..6b524cd 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
@@ -193,7 +193,7 @@ public class SchemaConfigurationConverterTest {
TableConfiguration tblCfg = confRegistry.getConfiguration(TablesConfiguration.KEY).tables()
.get(tbl.canonicalName());
- SchemaTable tbl2 = SchemaConfigurationConverter.convert(tblCfg);
+ SchemaTable tbl2 = SchemaConfigurationConverter.convert(tblCfg.value());
assertEquals(tbl.canonicalName(), tbl2.canonicalName());
assertEquals(tbl.indices().size(), tbl2.indices().size());
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1937789..95dd2d5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -34,6 +35,8 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
@@ -220,8 +223,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
assert newTbl.columns().namedListKeys() != null && oldTbl.columns().namedListKeys() != null;
- return newTbl.columns().namedListKeys().stream().anyMatch(c -> !oldTbl.columns().namedListKeys().contains(c)) ||
- oldTbl.columns().namedListKeys().stream().anyMatch(c -> !newTbl.columns().namedListKeys().contains(c));
+ if (!newTbl.columns().namedListKeys().equals(oldTbl.columns().namedListKeys()))
+ return true;
+
+ return newTbl.columns().namedListKeys().stream().anyMatch(k -> {
+ final ColumnView newCol = newTbl.columns().get(k);
+ final ColumnView oldCol = oldTbl.columns().get(k);
+
+ assert oldCol != null;
+
+ assert Objects.equals(newCol.type(), oldCol.type()) : "Columns type change is not supported.";
+ assert Objects.equals(newCol.nullable(), oldCol.nullable()) : "Column nullability change is not supported";
+
+ return !Objects.equals(newCol.name(), oldCol.name()) ||
+ !Objects.equals(newCol.defaultValue(), oldCol.defaultValue());
+ });
}).collect(Collectors.toSet()) :
Collections.emptySet();
@@ -229,7 +245,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
futs.addAll(startTables(tablesToStart, ctx.storageRevision(), ctx.newValue()));
if (!schemaChanged.isEmpty())
- futs.addAll(changeSchema(schemaChanged));
+ futs.addAll(changeSchema(ctx, schemaChanged));
if (!tablesToStop.isEmpty())
futs.addAll(stopTables(tablesToStop));
@@ -376,10 +392,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/**
* Start tables routine.
*
+ * @param ctx Configuration change context.
* @param tbls Tables to start.
* @return Table creation futures.
*/
- private List<CompletableFuture<Boolean>> changeSchema(Set<String> tbls) {
+ private List<CompletableFuture<Boolean>> changeSchema(
+ ConfigurationNotificationEvent<NamedListView<TableView>> ctx,
+ Set<String> tbls
+ ) {
boolean hasMetastorageLocally = metaStorageMgr.hasMetastorageLocally(configurationMgr);
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
@@ -392,7 +412,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
final int ver = tbl.schemaView().lastSchemaVersion() + 1;
if (hasMetastorageLocally)
- futs.add(schemaMgr.updateSchemaForTable(tblId, tblName));
+ futs.add(schemaMgr.updateSchemaForTable(tblId, tblName, ctx.oldValue().get(tblName), ctx.newValue().get(tblName)));
final CompletableFuture<SchemaEventParameters> schemaReadyFut = new CompletableFuture<>();