You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by zs...@apache.org on 2022/10/19 08:43:43 UTC
[ignite-3] branch main updated: IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.
This is an automated email from the ASF dual-hosted git repository.
zstan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7400742500 IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.
7400742500 is described below
commit 74007425000255c3a65bfbe4bd36d73537488d3a
Author: zstan <st...@gmail.com>
AuthorDate: Wed Oct 19 11:39:54 2022 +0300
IGNITE-17702 Move schema changes history from configuration to metastore - Fixes #1134.
Signed-off-by: zstan <st...@gmail.com>
---
.../ignite/internal/index/IndexManagerTest.java | 2 +-
.../storage/ItRebalanceDistributedTest.java | 50 +++-
.../runner/app/AbstractSchemaChangeTest.java | 42 +--
.../internal/runner/app/ItDataSchemaSyncTest.java | 113 +++++++-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../runner/app/ItTableApiContractTest.java | 38 +--
.../internal/runner/app/ItTablesApiTest.java | 17 +-
.../internal/sql/internal/InternalSchemaTest.java | 123 +++++++++
.../internal/test/WatchListenerInhibitor.java | 9 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
modules/schema/build.gradle | 1 +
modules/schema/pom.xml | 5 +
.../ignite/internal/schema/SchemaManager.java | 290 +++++++++++++++------
.../ignite/internal/schema/SchemaRegistry.java | 2 +-
.../apache/ignite/internal/schema/SchemaUtils.java | 40 +--
.../ExtendedTableConfigurationSchema.java | 9 +-
.../configuration/SchemaConfigurationSchema.java | 31 ---
.../schema/registry/SchemaRegistryImpl.java | 11 +-
.../schema/registry/UpgradingRowAdapter.java | 2 +-
.../schema/registry/SchemaRegistryImplTest.java | 11 +-
.../schema/registry/UpgradingRowAdapterTest.java | 7 +-
.../sql/engine/exec/ddl/DdlCommandHandler.java | 111 ++++----
.../internal/sql/engine/StopCalciteModuleTest.java | 2 +-
.../sql/engine/exec/MockedStructuresTest.java | 289 +-------------------
.../internal/table/distributed/TableManager.java | 88 ++-----
.../table/distributed/TableManagerTest.java | 50 ++--
26 files changed, 717 insertions(+), 630 deletions(-)
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index c8f183c317..e7e540cd9d 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -127,7 +127,7 @@ public class IndexManagerTest {
chg.changePrimaryKey(pk -> pk.changeColumns("c1").changeColocationColumns("c1"));
- ((ExtendedTableChange) chg).changeAssignments((byte) 1);
+ ((ExtendedTableChange) chg).changeAssignments((byte) 1).changeSchemaId(1);
})).get();
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a952cd39a5..b98a97ba87 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -175,7 +175,10 @@ public class ItRebalanceDistributedTest {
assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().get("TBL1").replicas().value());
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(2);
+ return true;
+ }));
waitPartitionAssignmentsSyncedToExpected(0, 2);
@@ -200,8 +203,15 @@ public class ItRebalanceDistributedTest {
assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables()
.get("TBL1").replicas().value());
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(2);
+ return true;
+ }));
+
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(3);
+ return true;
+ }));
waitPartitionAssignmentsSyncedToExpected(0, 3);
@@ -226,9 +236,20 @@ public class ItRebalanceDistributedTest {
assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables()
.get("TBL1").replicas().value());
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(2)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(2);
+ return true;
+ }));
+
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(3);
+ return true;
+ }));
+
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(2);
+ return true;
+ }));
waitPartitionAssignmentsSyncedToExpected(0, 2);
@@ -277,7 +298,10 @@ public class ItRebalanceDistributedTest {
return false;
});
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(3);
+ return true;
+ }));
countDownLatch.await();
@@ -308,7 +332,10 @@ public class ItRebalanceDistributedTest {
assertEquals(1, nodes.get(0).clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().get("TBL1").replicas().value());
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(1)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(1);
+ return true;
+ }));
waitPartitionAssignmentsSyncedToExpected(0, 1);
@@ -331,7 +358,10 @@ public class ItRebalanceDistributedTest {
return false;
});
- await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> ch.changeReplicas(3)));
+ await(nodes.get(0).tableManager.alterTableAsync("TBL1", ch -> {
+ ch.changeReplicas(3);
+ return true;
+ }));
waitPartitionAssignmentsSyncedToExpected(0, 3);
@@ -506,7 +536,7 @@ public class ItRebalanceDistributedTest {
metaStorageManager,
clusterService);
- schemaManager = new SchemaManager(registry, tablesCfg);
+ schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager);
tableManager = new TableManager(
name,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index ee13ec017b..635322e8e2 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -232,11 +232,12 @@ abstract class AbstractSchemaChangeTest {
*/
protected static void renameColumn(List<Ignite> nodes, String oldName, String newName) {
await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE,
- tblChanger -> tblChanger.changeColumns(
- colListChanger -> colListChanger
- .rename(IgniteNameUtils.parseSimpleName(oldName), IgniteNameUtils.parseSimpleName(newName))
- )
- ));
+ tblChanger -> {
+ tblChanger.changeColumns(
+ colListChanger -> colListChanger
+ .rename(IgniteNameUtils.parseSimpleName(oldName), IgniteNameUtils.parseSimpleName(newName)));
+ return true;
+ }));
}
/**
@@ -247,16 +248,16 @@ abstract class AbstractSchemaChangeTest {
* @param defSup Default value supplier.
*/
protected static void changeDefault(List<Ignite> nodes, String colName, Supplier<Object> defSup) {
- await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE,
- tblChanger -> tblChanger.changeColumns(
- colListChanger -> colListChanger
- .update(
- IgniteNameUtils.parseSimpleName(colName),
- colChanger -> colChanger.changeDefaultValueProvider(colDefChange -> colDefChange.convert(
- ConstantValueDefaultChange.class).changeDefaultValue(defSup.get().toString()))
- )
- )
- ));
+ await(((TableManager) nodes.get(0).tables()).alterTableAsync(TABLE, tblChanger -> {
+ tblChanger.changeColumns(
+ colListChanger -> colListChanger
+ .update(
+ IgniteNameUtils.parseSimpleName(colName),
+ colChanger -> colChanger.changeDefaultValueProvider(colDefChange -> colDefChange.convert(
+ ConstantValueDefaultChange.class).changeDefaultValue(defSup.get().toString()))
+ ));
+ return true;
+ }));
}
/**
@@ -268,11 +269,12 @@ abstract class AbstractSchemaChangeTest {
*/
private static void assertColumnChangeFailed(List<Ignite> grid, String colName, Consumer<ColumnChange> colChanger) {
assertThrows(IgniteException.class, () ->
- await(((TableManager) grid.get(0).tables()).alterTableAsync(TABLE,
- tblChanger -> tblChanger.changeColumns(
- listChanger -> listChanger.update(IgniteNameUtils.parseSimpleName(colName), colChanger)
- )
- ))
+ await(((TableManager) grid.get(0).tables()).alterTableAsync(TABLE, tblChanger -> {
+ tblChanger.changeColumns(
+ listChanger ->
+ listChanger.update(IgniteNameUtils.parseSimpleName(colName), colChanger));
+ return true;
+ }))
);
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 4b92592ef2..64622eece3 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.runner.app;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
@@ -38,6 +40,7 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -97,7 +100,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
* Starts a cluster before every test started.
*/
@BeforeEach
- void beforeEach() throws Exception {
+ void beforeEach() {
List<CompletableFuture<Ignite>> futures = nodesBootstrapCfg.entrySet().stream()
.map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
.collect(toList());
@@ -125,6 +128,106 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
IgniteUtils.closeAll(closeables);
}
+ /**
+ * Test correctness of schema updates on lagged node.
+ */
+ @Test
+ public void checkSchemasCorrectUpdate() throws Exception {
+ Ignite ignite0 = clusterNodes.get(0);
+ IgniteImpl ignite1 = (IgniteImpl) clusterNodes.get(1);
+ IgniteImpl ignite2 = (IgniteImpl) clusterNodes.get(2);
+
+ createTable(ignite0, TABLE_NAME);
+
+ TableImpl table = (TableImpl) ignite0.tables().table(TABLE_NAME);
+
+ assertEquals(1, table.schemaView().schema().version());
+
+ WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(ignite1);
+
+ listenerInhibitor.startInhibit();
+
+ alterTable(ignite0, TABLE_NAME);
+
+ table = (TableImpl) ignite2.tables().table(TABLE_NAME);
+
+ TableImpl table0 = table;
+ assertTrue(waitForCondition(() -> table0.schemaView().schema().version() == 2, 5_000));
+
+ table = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+ assertEquals(1, table.schemaView().schema().version());
+
+ String nodeToStop = ignite1.name();
+
+ IgnitionManager.stop(nodeToStop);
+
+ listenerInhibitor.stopWithoutResend();
+
+ CompletableFuture<Ignite> ignite1Fut = nodesBootstrapCfg.entrySet().stream()
+ .filter(k -> k.getKey().equals(nodeToStop))
+ .map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
+ .findFirst().get();
+
+ ignite1 = (IgniteImpl) ignite1Fut.get();
+
+ table = (TableImpl) ignite1.tables().table(TABLE_NAME);
+
+ TableImpl table1 = table;
+ assertTrue(waitForCondition(() -> table1.schemaView().schema().version() == 2, 5_000));
+ }
+
+ /**
+ * Test correctness of schemes recovery after node restart.
+ */
+ @Test
+ public void checkSchemasCorrectlyRestore() throws Exception {
+ Ignite ignite1 = clusterNodes.get(1);
+
+ sql(ignite1, "CREATE TABLE " + TABLE_NAME + "(key BIGINT PRIMARY KEY, valint1 INT, valint2 INT)");
+
+ for (int i = 0; i < 10; ++i) {
+ sql(ignite1, String.format("INSERT INTO " + TABLE_NAME + " VALUES(%d, %d, %d)", i, i, 2 * i));
+ }
+
+ sql(ignite1, "ALTER TABLE " + TABLE_NAME + " DROP COLUMN valint1");
+
+ sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint3 INT");
+
+ sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint4 INT");
+
+ String nodeToStop = ignite1.name();
+
+ IgnitionManager.stop(nodeToStop);
+
+ CompletableFuture<Ignite> ignite1Fut = nodesBootstrapCfg.entrySet().stream()
+ .filter(k -> k.getKey().equals(nodeToStop))
+ .map(e -> IgnitionManager.start(e.getKey(), e.getValue(), workDir.resolve(e.getKey())))
+ .findFirst().get();
+
+ ignite1 = ignite1Fut.get();
+
+ Session ses = ignite1.sql().createSession();
+
+ ResultSet res = ses.execute(null, "SELECT valint2 FROM tbl1");
+
+ for (int i = 0; i < 10; ++i) {
+ assertNotNull(res.next().iterator().next());
+ }
+
+ for (int i = 10; i < 20; ++i) {
+ sql(ignite1, String.format("INSERT INTO " + TABLE_NAME + " VALUES(%d, %d, %d, %d)", i, i, i, i));
+ }
+
+ sql(ignite1, "ALTER TABLE " + TABLE_NAME + " DROP COLUMN valint3");
+
+ sql(ignite1, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valint5 INT");
+
+ res = ses.execute(null, "SELECT sum(valint4) FROM tbl1");
+
+ assertEquals(res.next().iterator().next(), 10L * (10 + 19) / 2);
+ }
+
/**
* The test executes various operation over the lagging node. The operations can be executed only the node overtakes a distributed
* cluster state.
@@ -154,7 +257,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
listenerInhibitor.startInhibit();
- sql(ignite0, "ALTER TABLE " + TABLE_NAME + " ADD COLUMN valstr2 VARCHAR NOT NULL DEFAULT 'default'");
+ alterTable(ignite0, TABLE_NAME);
for (Ignite node : clusterNodes) {
if (node == ignite1) {
@@ -163,7 +266,7 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
TableImpl tableOnNode = (TableImpl) node.tables().table(TABLE_NAME);
- IgniteTestUtils.waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
+ waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
}
TableImpl table1 = (TableImpl) ignite1.tables().table(TABLE_NAME);
@@ -236,6 +339,10 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
sql(node, "CREATE TABLE " + tableName + "(key BIGINT PRIMARY KEY, valint INT, valstr VARCHAR)");
}
+ protected void alterTable(Ignite node, String tableName) {
+ sql(node, "ALTER TABLE " + tableName + " ADD COLUMN valstr2 VARCHAR NOT NULL DEFAULT 'default'");
+ }
+
protected void sql(Ignite node, String query, Object... args) {
try (Session session = node.sql().createSession()) {
session.execute(null, query, args);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index cf47baecae..87adf836c1 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -278,7 +278,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
TablesConfiguration tblCfg = clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
- SchemaManager schemaManager = new SchemaManager(registry, tblCfg);
+ SchemaManager schemaManager = new SchemaManager(registry, tblCfg, metaStorageMgr);
ReplicaService replicaSvc = new ReplicaService(
clusterSvc.messagingService(),
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
index 8db106a9ba..88a242487c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableApiContractTest.java
@@ -122,21 +122,23 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
.changeReplicas(2)
.changePartitions(10)));
- await(tableManager().alterTableAsync(TABLE_NAME,
- chng -> chng.changeColumns(cols -> {
+ await(tableManager().alterTableAsync(TABLE_NAME, chng -> {
+ chng.changeColumns(cols ->
cols.create("NAME", colChg -> convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
- .withDefaultValue("default").build(), colChg));
- })));
+ .withDefaultValue("default").build(), colChg)));
+ return true;
+ }));
assertNotNull(ignite.tables().table(TABLE_NAME));
assertNull(ignite.tables().table(TABLE_NAME + "_not_exist"));
- assertThrows(TableNotFoundException.class, () -> await(tableManager().alterTableAsync(TABLE_NAME + "_not_exist",
- chng -> chng.changeColumns(cols -> {
+ assertThrows(TableNotFoundException.class, () -> await(tableManager().alterTableAsync(TABLE_NAME + "_not_exist", chng -> {
+ chng.changeColumns(cols ->
cols.create("NAME", colChg -> convert(SchemaBuilders.column("name", ColumnType.string()).asNullable(true)
- .withDefaultValue("default").build(), colChg));
- }))));
+ .withDefaultValue("default").build(), colChg)));
+ return true;
+ })));
}
/**
@@ -156,16 +158,20 @@ public class ItTableApiContractTest extends AbstractBasicIntegrationTest {
.changePartitions(10)));
CompletableFuture<Void> altTblFut1 = tableManager().alterTableAsync(TABLE_NAME,
- chng -> chng.changeColumns(cols -> {
- cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true)
- .withDefaultValue("default").build(), colChg));
- }));
+ chng -> {
+ chng.changeColumns(cols ->
+ cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME",
+ ColumnType.string()).asNullable(true).withDefaultValue("default").build(), colChg)));
+ return true;
+ });
CompletableFuture<Void> altTblFut2 = tableManager().alterTableAsync(TABLE_NAME + "_not_exist",
- chng -> chng.changeColumns(cols -> {
- cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME", ColumnType.string()).asNullable(true)
- .withDefaultValue("default").build(), colChg));
- }));
+ chng -> {
+ chng.changeColumns(cols ->
+ cols.create("NAME", colChg -> convert(SchemaBuilders.column("NAME",
+ ColumnType.string()).asNullable(true).withDefaultValue("default").build(), colChg)));
+ return true;
+ });
assertNotNull(ignite.tables().table(TABLE_NAME));
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 069cd4713a..ba9eb80043 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -543,13 +543,16 @@ public class ItTablesApiTest extends IgniteAbstractTest {
private void addColumnInternal(Ignite node, String tableName, ColumnDefinition colDefinition) {
await(((TableManager) node.tables()).alterTableAsync(
tableName,
- chng -> chng.changeColumns(cols -> {
- try {
- cols.create(colDefinition.name(), colChg -> convert(colDefinition, colChg));
- } catch (IllegalArgumentException e) {
- throw new ColumnAlreadyExistsException(colDefinition.name());
- }
- })));
+ chng -> {
+ chng.changeColumns(cols -> {
+ try {
+ cols.create(colDefinition.name(), colChg -> convert(colDefinition, colChg));
+ } catch (IllegalArgumentException e) {
+ throw new ColumnAlreadyExistsException(colDefinition.name());
+ }
+ });
+ return true;
+ }));
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java
new file mode 100644
index 0000000000..aab4b1e9af
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/internal/InternalSchemaTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.internal;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.Session;
+import org.junit.jupiter.api.Test;
+
+/** Tests for internal manipulations with schema. */
+public class InternalSchemaTest extends AbstractBasicIntegrationTest {
+ /**
+ * Checks that schema version is updated even if column names are intersected.
+ */
+ @Test
+ public void checkSchemaUpdatedWithEqAlterColumn() {
+ IgniteSql sql = igniteSql();
+ Session ses = sql.createSession();
+
+ checkDdl(true, ses, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+ Ignite node = CLUSTER_NODES.get(0);
+
+ ConfigurationManager cfgMgr = IgniteTestUtils.getFieldValue(node, "clusterCfgMgr");
+
+ final TablesConfiguration tablesConfiguration = cfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY);
+
+ int schIdBefore = ((ExtendedTableView) tablesConfiguration.tables().get("TEST").value()).schemaId();
+
+ checkDdl(false, ses, "ALTER TABLE TEST ADD COLUMN IF NOT EXISTS (VAL0 INT, VAL1 INT)");
+
+ int schIdAfter = ((ExtendedTableView) tablesConfiguration.tables().get("TEST").value()).schemaId();
+
+ assertEquals(schIdBefore + 1, schIdAfter);
+ }
+
+ /** Test correct mapping schema after drop columns. */
+ @Test
+ public void testDropColumns() {
+ IgniteSql sql = igniteSql();
+ Session ses = sql.createSession();
+
+ checkDdl(true, ses, "CREATE TABLE my (c1 INT PRIMARY KEY, c2 INT, c3 VARCHAR)");
+
+ ses.execute(
+ null,
+ "INSERT INTO my VALUES (1, 2, '3')"
+ );
+
+ ResultSet res = ses.execute(
+ null,
+ "SELECT c1, c3 FROM my"
+ );
+
+ assertTrue(res.hasNext());
+
+ checkDdl(true, ses, "ALTER TABLE my DROP COLUMN c2");
+
+ res = ses.execute(
+ null,
+ "SELECT c1, c3 FROM my"
+ );
+
+ assertNotNull(res.next());
+
+ checkDdl(true, ses, "ALTER TABLE my ADD COLUMN (c2 INT, c4 VARCHAR)");
+
+ res = ses.execute(
+ null,
+ "SELECT c1, c3 FROM my"
+ );
+
+ assertNotNull(res.next());
+ }
+
+ private static void checkDdl(boolean expectedApplied, Session ses, String sql) {
+ ResultSet res = ses.execute(
+ null,
+ sql
+ );
+
+ assertEquals(expectedApplied, res.wasApplied());
+ assertFalse(res.hasRowSet());
+ assertEquals(-1, res.affectedRows());
+
+ res.close();
+ }
+
+ /**
+ * Gets the SQL API.
+ *
+ * @return SQL API.
+ */
+ protected IgniteSql igniteSql() {
+ return CLUSTER_NODES.get(0).sql();
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index f6577fc689..a122ffe652 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -148,4 +148,13 @@ public class WatchListenerInhibitor implements WatchListener {
inhibitEvents.clear();
}
+
+ /**
+ * Stops silently, no events resend.
+ */
+ public synchronized void stopWithoutResend() {
+ inhibit = false;
+
+ inhibitEvents.clear();
+ }
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f8cf815bf1..621a8ebdde 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -359,7 +359,7 @@ public class IgniteImpl implements Ignite {
)
);
- schemaManager = new SchemaManager(registry, tablesConfiguration);
+ schemaManager = new SchemaManager(registry, tablesConfiguration, metaStorageMgr);
volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
diff --git a/modules/schema/build.gradle b/modules/schema/build.gradle
index 04cea165e0..f3086f3765 100644
--- a/modules/schema/build.gradle
+++ b/modules/schema/build.gradle
@@ -29,6 +29,7 @@ dependencies {
implementation project(':ignite-bytecode')
implementation project(':ignite-core')
implementation project(':ignite-configuration')
+ implementation project(':ignite-metastorage')
implementation libs.jetbrains.annotations
testAnnotationProcessor project(':ignite-configuration-annotation-processor')
diff --git a/modules/schema/pom.xml b/modules/schema/pom.xml
index 485723eb2e..9131612929 100644
--- a/modules/schema/pom.xml
+++ b/modules/schema/pom.xml
@@ -58,6 +58,11 @@
<artifactId>ignite-binary-tuple</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-metastorage</artifactId>
+ </dependency>
+
<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index d5e70fb272..e6cb7a0c21 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -19,42 +19,43 @@ package org.apache.ignite.internal.schema;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.HashMap;
import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
-import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.causality.VersionedValue;
-import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.Operations;
+import org.apache.ignite.internal.schema.configuration.ColumnView;
import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
-import org.apache.ignite.internal.schema.configuration.SchemaConfiguration;
-import org.apache.ignite.internal.schema.configuration.SchemaView;
+import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.lang.IgniteTriConsumer;
import org.apache.ignite.lang.NodeStoppingException;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -64,13 +65,8 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
/** Initial version for schemas. */
public static final int INITIAL_SCHEMA_VERSION = 1;
- /**
- * If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be
- * skipped, and the local property will be returned.
- * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
- * TODO: will be removed after fix of the issue.
- */
- private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
+ /** Schema history key predicate part. */
+ public static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -84,50 +80,112 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>> registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
- ((ExtendedTableConfiguration) tablesCfg.tables().any()).schemas().listenElements(new ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?> onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- return onSchemaCreate(schemasCtx);
+ for (String tblName : tablesCfg.tables().value().namedListKeys()) {
+ ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(tblName));
+ UUID tblId = tblCfg.id().value();
+
+ Map<Integer, byte[]> schemas = collectAllSchemas(tblId);
+
+ byte[] serialized;
+
+ if (!schemas.isEmpty()) {
+ for (Map.Entry<Integer, byte[]> ent : schemas.entrySet()) {
+ serialized = ent.getValue();
+
+ SchemaDescriptor desc = SchemaSerializerImpl.INSTANCE.deserialize(serialized);
+
+ createSchema(0, tblId, tblName, desc).join();
+ }
+
+ registriesVv.complete(0);
+ } else {
+ serialized = schemas.get(INITIAL_SCHEMA_VERSION);
+
+ assert serialized != null;
}
- });
+ }
+
+ tablesCfg.tables().any().columns().listen(this::onSchemaChange);
}
/**
* Listener of schema configuration changes.
*
- * @param schemasCtx Schemas configuration context.
+ * @param ctx Configuration context.
* @return A future.
*/
- private CompletableFuture<?> onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ private CompletableFuture<?> onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
}
try {
- long causalityToken = schemasCtx.storageRevision();
+ ExtendedTableView tblCfg = (ExtendedTableView) ctx.config(ExtendedTableConfiguration.class).value();
- ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
+ int verFromUpdate = tblCfg.schemaId();
- UUID tblId = tblCfg.id().value();
+ UUID tblId = tblCfg.id();
- String tableName = tblCfg.name().value();
+ String tableName = tblCfg.name();
- SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+ SchemaDescriptor schemaDescFromUpdate = SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
- CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+ if (searchSchemaByVersion(tblId, schemaDescFromUpdate.version()) != null) {
+ return completedFuture(null);
+ }
- registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
- () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+ if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+ SchemaDescriptor oldSchema = searchSchemaByVersion(tblId, verFromUpdate - 1);
+ assert oldSchema != null;
+
+ NamedListView<ColumnView> oldCols = ctx.oldValue();
+ NamedListView<ColumnView> newCols = ctx.newValue();
+
+ schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+ oldSchema,
+ oldCols,
+ schemaDescFromUpdate,
+ newCols));
+ }
+
+ long causalityToken = ctx.storageRevision();
+
+ CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
+
+ try {
+ final ByteArray key = schemaWithVerHistKey(tblId, verFromUpdate);
+
+ createSchemaFut.thenCompose(t -> metastorageMgr.invoke(
+ Conditions.notExists(key),
+ Operations.put(key, SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate)),
+ Operations.noop()));
+ } catch (Throwable th) {
+ createSchemaFut.completeExceptionally(th);
+ }
+
+ createSchemaFut.whenComplete((ignore, th) -> {
+ if (th == null) {
+ registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
+ () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+ }
+ });
return createSchemaFut;
} finally {
@@ -237,18 +295,32 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
* @param schemaVer Schema version.
* @return Schema descriptor.
*/
- private SchemaDescriptor tableSchema(UUID tblId, String tableName, int schemaVer) {
+ private CompletableFuture<SchemaDescriptor> tableSchema(UUID tblId, String tableName, int schemaVer) {
ExtendedTableConfiguration tblCfg = ((ExtendedTableConfiguration) tablesCfg.tables().get(tableName));
+ CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
+
if (checkSchemaVersion(tblId, schemaVer)) {
- return getSchemaDescriptorLocally(schemaVer, tblCfg);
- }
+ SchemaDescriptor desc = searchSchemaByVersion(tblId, schemaVer);
- CompletableFuture<SchemaDescriptor> fut = new CompletableFuture<>();
+ if (desc == null) {
+ return getSchemaDescriptor(schemaVer, tblCfg);
+ } else {
+ fut.complete(desc);
+ return fut;
+ }
+ }
IgniteTriConsumer<Long, Map<UUID, SchemaRegistryImpl>, Throwable> schemaListener = (token, regs, e) -> {
if (schemaVer <= regs.get(tblId).lastSchemaVersion()) {
- fut.complete(getSchemaDescriptorLocally(schemaVer, tblCfg));
+ getSchemaDescriptor(schemaVer, tblCfg)
+ .whenComplete((desc, th) -> {
+ if (th != null) {
+ fut.completeExceptionally(th);
+ } else {
+ fut.complete(desc);
+ }
+ });
}
};
@@ -259,10 +331,13 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
if (checkSchemaVersion(tblId, schemaVer)) {
registriesVv.removeWhenComplete(schemaListener);
- return getSchemaDescriptorLocally(schemaVer, tblCfg);
+ return getSchemaDescriptor(schemaVer, tblCfg);
}
- return fut.whenComplete((unused, throwable) -> registriesVv.removeWhenComplete(schemaListener)).join();
+ return fut.thenApply(res -> {
+ registriesVv.removeWhenComplete(schemaListener);
+ return res;
+ });
}
/**
@@ -281,59 +356,34 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
}
/**
- * Checks that the schema is configured in the Metasorage consensus.
+ * Try to find schema in cache.
*
* @param tblId Table id.
* @param schemaVer Schema version.
- * @return True when the schema configured, false otherwise.
+ * @return Descriptor if required schema found, or {@code null} otherwise.
*/
- private boolean isSchemaExists(UUID tblId, int schemaVer) {
- return latestSchemaVersion(tblId) >= schemaVer;
- }
-
- /**
- * Gets the latest version of the table schema which available in Metastore.
- *
- * @param tblId Table id.
- * @return The latest schema version.
- */
- private int latestSchemaVersion(UUID tblId) {
- try {
- NamedListView<SchemaView> tblSchemas = ((ExtendedTableConfiguration) getByInternalId(directProxy(tablesCfg.tables()), tblId))
- .schemas().value();
-
- int lastVer = INITIAL_SCHEMA_VERSION;
-
- for (String schemaVerAsStr : tblSchemas.namedListKeys()) {
- int ver = Integer.parseInt(schemaVerAsStr);
-
- if (ver > lastVer) {
- lastVer = ver;
- }
- }
-
- return lastVer;
- } catch (NoSuchElementException e) {
- assert false : "Table must exist. [tableId=" + tblId + ']';
+ private SchemaDescriptor searchSchemaByVersion(UUID tblId, int schemaVer) {
+ SchemaRegistry registry = registriesVv.latest().get(tblId);
- return INITIAL_SCHEMA_VERSION;
+ if (registry != null && schemaVer <= registry.lastSchemaVersion()) {
+ return registry.schema(schemaVer);
+ } else {
+ return null;
}
}
/**
- * Gets a schema descriptor from the local node configuration storage.
+ * Gets a schema descriptor from the configuration storage.
*
* @param schemaVer Schema version.
* @param tblCfg Table configuration.
* @return Schema descriptor.
*/
- @NotNull
- private SchemaDescriptor getSchemaDescriptorLocally(int schemaVer, ExtendedTableConfiguration tblCfg) {
- SchemaConfiguration schemaCfg = tblCfg.schemas().get(String.valueOf(schemaVer));
+ private CompletableFuture<SchemaDescriptor> getSchemaDescriptor(int schemaVer, ExtendedTableConfiguration tblCfg) {
+ CompletableFuture<Entry> ent = metastorageMgr.get(
+ schemaWithVerHistKey(tblCfg.id().value(), schemaVer));
- assert schemaCfg != null;
-
- return SchemaSerializerImpl.INSTANCE.deserialize(schemaCfg.schema().value());
+ return ent.thenApply(e -> SchemaSerializerImpl.INSTANCE.deserialize(e.value()));
}
/**
@@ -400,15 +450,83 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
}
/**
- * Gets a direct accessor for the configuration distributed property.
- * If the metadata access only locally configured the method will return local property accessor.
+ * Gets the latest version of the table schema which available in Metastore.
+ *
+ * @param tblId Table id.
+ * @return The latest schema version.
+ */
+ private int latestSchemaVersion(UUID tblId) {
+ try {
+ Cursor<Entry> cur = metastorageMgr.prefix(schemaHistPrefix(tblId));
+
+ int lastVer = INITIAL_SCHEMA_VERSION;
+
+ for (Entry ent : cur) {
+ String key = ent.key().toString();
+ int descVer = extractVerFromSchemaKey(key);
+
+ if (descVer > lastVer) {
+ lastVer = descVer;
+ }
+ }
+
+ return lastVer;
+ } catch (NodeStoppingException e) {
+ throw new IgniteException(e.traceId(), e.code(), e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Collect all schemes for appropriate table.
*
- * @param property Distributed configuration property to receive direct access.
- * @param <T> Type of the property accessor.
- * @return An accessor for distributive property.
- * @see #getMetadataLocallyOnly
+ * @param tblId Table id.
+ * @return Sorted by key collection of schemes.
+ */
+ private SortedMap<Integer, byte[]> collectAllSchemas(UUID tblId) {
+ try {
+ Cursor<Entry> cur = metastorageMgr.prefix(schemaHistPrefix(tblId));
+
+ SortedMap<Integer, byte[]> schemes = new TreeMap<>();
+
+ for (Entry ent : cur) {
+ String key = ent.key().toString();
+ int descVer = extractVerFromSchemaKey(key);
+
+ schemes.put(descVer, ent.value());
+ }
+
+ return schemes;
+ } catch (NodeStoppingException e) {
+ throw new IgniteException(e.traceId(), e.code(), e.getMessage(), e);
+ }
+ }
+
+ private int extractVerFromSchemaKey(String key) {
+ int pos = key.lastIndexOf('.');
+ assert pos != -1 : "Unexpected key: " + key;
+
+ key = key.substring(pos + 1);
+ return Integer.parseInt(key);
+ }
+
+ /**
+ * Forms schema history key.
+ *
+ * @param tblId Table id.
+ * @param ver Schema version.
+ * @return {@link ByteArray} representation.
+ */
+ private static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {
+ return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX + ver);
+ }
+
+ /**
+ * Forms schema history predicate.
+ *
+ * @param tblId Table id.
+ * @return {@link ByteArray} representation.
*/
- private <T extends ConfigurationProperty<?>> T directProxy(T property) {
- return getMetadataLocallyOnly ? property : ConfigurationUtil.directProxy(property);
+ private static ByteArray schemaHistPrefix(UUID tblId) {
+ return ByteArray.fromString(tblId + SCHEMA_STORE_PREFIX);
}
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
index c648de1531..ffaaa97fa5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaRegistry.java
@@ -61,7 +61,7 @@ public interface SchemaRegistry {
/**
* Get last registered schema version.
*/
- public int lastSchemaVersion();
+ int lastSchemaVersion();
/**
* Resolve binary row against given schema.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
index d0e942d13d..b211ff5e39 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java
@@ -58,37 +58,39 @@ public class SchemaUtils {
) {
ColumnMapper mapper = null;
- // since newTblColumns comes from a Change class, it can only be of the same size or larger than the previous configuration,
- // because removed keys are simply replaced with nulls
- assert newTblColumns.size() >= oldTblColumns.size();
-
for (int i = 0; i < newTblColumns.size(); ++i) {
ColumnView newColView = newTblColumns.get(i);
- // new value can be null if a column has been deleted
- if (newColView == null) {
- continue;
- }
+ assert newColView != null;
+
+ Column newCol = newDesc.column(newColView.name());
if (i < oldTblColumns.size()) {
ColumnView oldColView = oldTblColumns.get(i);
- Column newCol = newDesc.column(newColView.name());
Column oldCol = oldDesc.column(oldColView.name());
if (newCol.schemaIndex() == oldCol.schemaIndex()) {
- continue;
+ if (!newCol.name().equals(oldCol.name())) {
+ if (mapper == null) {
+ mapper = ColumnMapping.createMapper(newDesc);
+ }
+
+ Column oldIdx = oldDesc.column(newColView.name());
+
+ // rename
+ if (oldIdx != null) {
+ mapper.add(newCol.schemaIndex(), oldIdx.schemaIndex());
+ }
+ }
+ } else {
+ if (mapper == null) {
+ mapper = ColumnMapping.createMapper(newDesc);
+ }
+
+ mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
}
-
- if (mapper == null) {
- mapper = ColumnMapping.createMapper(newDesc);
- }
-
- mapper.add(newCol.schemaIndex(), oldCol.schemaIndex());
} else {
- // if the new Named List is larger than the old one, it can only mean that a new column has been added
- Column newCol = newDesc.column(newColView.name());
-
assert !newDesc.isKeyColumn(newCol.schemaIndex());
if (mapper == null) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
index 1387ba3daa..a7780ca66e 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/ExtendedTableConfigurationSchema.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.schema.configuration;
import java.util.UUID;
import org.apache.ignite.configuration.annotation.InternalConfiguration;
import org.apache.ignite.configuration.annotation.InternalId;
-import org.apache.ignite.configuration.annotation.NamedConfigValue;
import org.apache.ignite.configuration.annotation.Value;
/**
@@ -34,13 +33,13 @@ public class ExtendedTableConfigurationSchema extends TableConfigurationSchema {
public UUID id;
/**
- * Serialized version of an affinity assignments. Currently configuration doesn't support neither collections nor array of arrays, so
+ * Serialized version of an affinity assignments. Currently, configuration doesn't support neither collections nor array of arrays, so
* that serialization was chosen.
*/
@Value
public byte[] assignments;
- /** Schemas history as named list where name is schema version and value is serialized version of schema itself. */
- @NamedConfigValue
- public SchemaConfigurationSchema schemas;
+ /** Current schema id. Monotonically increasing number. */
+ @Value
+ public int schemaId;
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java
deleted file mode 100644
index eead4a22e6..0000000000
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationSchema.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema.configuration;
-
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.Value;
-
-/**
- * Configuration schema of a table schema. Part of an internal configuration.
- */
-@Config
-public class SchemaConfigurationSchema {
- /** Serialized table schema configuration. */
- @Value
- public byte[] schema;
-}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
index 5c79fcfbe3..c75e9ac2b1 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -48,7 +49,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
private volatile int lastVer;
/** Schema store. */
- private final Function<Integer, SchemaDescriptor> history;
+ private final Function<Integer, CompletableFuture<SchemaDescriptor>> history;
/** The method to provide the latest schema version on cluster. */
private final IntSupplier latestVersionStore;
@@ -61,7 +62,7 @@ public class SchemaRegistryImpl implements SchemaRegistry {
* @param initialSchema Initial schema.
*/
public SchemaRegistryImpl(
- Function<Integer, SchemaDescriptor> history,
+ Function<Integer, CompletableFuture<SchemaDescriptor>> history,
IntSupplier latestVersionStore,
SchemaDescriptor initialSchema
) {
@@ -86,7 +87,11 @@ public class SchemaRegistryImpl implements SchemaRegistry {
return desc;
}
- desc = history.apply(ver);
+ CompletableFuture<SchemaDescriptor> descFut = history.apply(ver);
+
+ if (descFut != null) {
+ desc = descFut.join();
+ }
if (desc != null) {
schemaCache.putIfAbsent(ver, desc);
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
index db1c9cfbd7..7d357804de 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapter.java
@@ -77,7 +77,7 @@ class UpgradingRowAdapter extends Row {
* Map column.
*
* @param colIdx Column index in source schema.
- * @return Column index in targer schema.
+ * @return Column index in target schema.
*/
private int mapColumn(int colIdx) throws InvalidTypeException {
return mapper.map(colIdx);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
index 8f9535a639..f6aacd6761 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/SchemaRegistryImplTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -343,7 +344,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV1, schemaV2);
+ Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV1, schemaV2);
final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV2);
@@ -409,7 +410,7 @@ public class SchemaRegistryImplTest {
new Column[]{new Column("keyLongCol", INT64, false)},
new Column[]{new Column("valStringCol", STRING, true)});
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3);
+ Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV2, schemaV3);
final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV3);
@@ -475,7 +476,7 @@ public class SchemaRegistryImplTest {
new Column("valStringCol", STRING, true)
});
- Map<Integer, SchemaDescriptor> history = schemaHistory(schemaV2, schemaV3, schemaV4);
+ Map<Integer, CompletableFuture<SchemaDescriptor>> history = schemaHistory(schemaV2, schemaV3, schemaV4);
final SchemaRegistryImpl reg = new SchemaRegistryImpl(history::get, () -> INITIAL_SCHEMA_VERSION, schemaV4);
@@ -576,8 +577,8 @@ public class SchemaRegistryImplTest {
* @param history Table schema history.
* @return Schema history map.
*/
- private Map<Integer, SchemaDescriptor> schemaHistory(SchemaDescriptor... history) {
- return Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, e -> e));
+ private Map<Integer, CompletableFuture<SchemaDescriptor>> schemaHistory(SchemaDescriptor... history) {
+ return Arrays.stream(history).collect(Collectors.toMap(SchemaDescriptor::version, CompletableFuture::completedFuture));
}
/**
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
index 76aa829f37..d08a6035c6 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/registry/UpgradingRowAdapterTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema.registry;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.schema.NativeTypes.BYTES;
import static org.apache.ignite.internal.schema.NativeTypes.DATE;
import static org.apache.ignite.internal.schema.NativeTypes.DOUBLE;
@@ -151,12 +152,14 @@ public class UpgradingRowAdapterTest {
ByteBufferRow row = new ByteBufferRow(serializeValuesToRow(schema, values));
// Validate row.
- validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? schema : schema2, () -> INITIAL_SCHEMA_VERSION, schema), row);
+ validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? completedFuture(schema) : completedFuture(schema2),
+ () -> INITIAL_SCHEMA_VERSION, schema), row);
// Validate upgraded row.
values.add(addedColumnIndex, null);
- validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? schema : schema2, () -> schema2.version(), schema2), row);
+ validateRow(values, new SchemaRegistryImpl(v -> v == 1 ? completedFuture(schema) : completedFuture(schema2),
+ () -> schema2.version(), schema2), row);
}
private void validateRow(List<Object> values, SchemaRegistryImpl schemaRegistry, ByteBufferRow binaryRow) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 72df17a58f..b3927b655d 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -282,43 +282,51 @@ public class DdlCommandHandler {
* @return {@code true} if the full columns set is applied successfully. Otherwise, returns {@code false}.
*/
private CompletableFuture<Boolean> addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean ignoreColumnExistance) {
- AtomicBoolean ret = new AtomicBoolean(true);
+ AtomicBoolean retUsr = new AtomicBoolean(true);
return tableManager.alterTableAsync(
fullName,
- chng -> chng.changeColumns(cols -> {
- ret.set(true); // Reset state if closure have been restarted.
+ chng -> {
+ AtomicBoolean retTbl = new AtomicBoolean();
- Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
+ chng.changeColumns(cols -> {
+ retUsr.set(true); // Reset state if closure have been restarted.
- List<ColumnDefinition> colsDef0;
+ Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
- if (ignoreColumnExistance) {
- colsDef0 = colsDef.stream().filter(k -> {
- if (colNamesToOrders.containsKey(k.name())) {
- ret.set(false);
+ List<ColumnDefinition> colsDef0;
- return false;
- } else {
- return true;
- }
- }).collect(Collectors.toList());
- } else {
- colsDef.stream()
- .filter(k -> colNamesToOrders.containsKey(k.name()))
- .findAny()
- .ifPresent(c -> {
- throw new ColumnAlreadyExistsException(c.name());
- });
-
- colsDef0 = colsDef;
- }
+ if (ignoreColumnExistance) {
+ colsDef0 = colsDef.stream().filter(k -> {
+ if (colNamesToOrders.containsKey(k.name())) {
+ retUsr.set(false);
- for (ColumnDefinition col : colsDef0) {
- cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
- }
- })
- ).thenApply(v -> ret.get());
+ return false;
+ } else {
+ return true;
+ }
+ }).collect(Collectors.toList());
+ } else {
+ colsDef.stream()
+ .filter(k -> colNamesToOrders.containsKey(k.name()))
+ .findAny()
+ .ifPresent(c -> {
+ throw new ColumnAlreadyExistsException(c.name());
+ });
+
+ colsDef0 = colsDef;
+ }
+
+ for (ColumnDefinition col : colsDef0) {
+ cols.create(col.name(), colChg -> convertColumnDefinition(col, colChg));
+ }
+
+ retTbl.set(!colsDef0.isEmpty());
+ });
+
+ return retTbl.get();
+ }
+ ).thenApply(v -> retUsr.get());
}
private void convertColumnDefinition(ColumnDefinition definition, ColumnChange columnChange) {
@@ -368,37 +376,40 @@ public class DdlCommandHandler {
return tableManager.alterTableAsync(
tableName,
- chng -> chng.changeColumns(cols -> {
- ret.set(true); // Reset state if closure have been restarted.
+ chng -> {
+ chng.changeColumns(cols -> {
+ ret.set(true); // Reset state if closure have been restarted.
- PrimaryKeyView priKey = chng.primaryKey();
+ PrimaryKeyView priKey = chng.primaryKey();
- Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
+ Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
- Set<String> colNames0 = new HashSet<>();
+ Set<String> colNames0 = new HashSet<>();
- Set<String> primaryCols = Set.of(priKey.columns());
+ Set<String> primaryCols = Set.of(priKey.columns());
+
+ for (String colName : colNames) {
+ if (!colNamesToOrders.containsKey(colName)) {
+ ret.set(false);
- for (String colName : colNames) {
- if (!colNamesToOrders.containsKey(colName)) {
- ret.set(false);
+ if (!ignoreColumnExistence) {
+ throw new ColumnNotFoundException(DEFAULT_SCHEMA_NAME, tableName, colName);
+ }
+ } else {
+ colNames0.add(colName);
+ }
- if (!ignoreColumnExistence) {
- throw new ColumnNotFoundException(DEFAULT_SCHEMA_NAME, tableName, colName);
+ if (primaryCols.contains(colName)) {
+ throw new SqlException(DEL_PK_COMUMN_CONSTRAINT_ERR, IgniteStringFormatter
+ .format("Can`t delete column, belongs to primary key: [name={}]", colName));
}
- } else {
- colNames0.add(colName);
}
- if (primaryCols.contains(colName)) {
- throw new SqlException(DEL_PK_COMUMN_CONSTRAINT_ERR, IgniteStringFormatter
- .format("Can`t delete column, belongs to primary key: [name={}]", colName));
- }
- }
+ colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
+ });
- colNames0.forEach(k -> cols.delete(colNamesToOrders.get(k)));
- }))
- .thenApply(v -> ret.get());
+ return ret.get();
+ }).thenApply(v -> ret.get());
}
private static void convert(NativeType colType, ColumnTypeChange colTypeChg) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index e2d560e357..6ee94075e2 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -151,7 +151,7 @@ public class StopCalciteModuleTest {
new Column[]{new Column(1, "VAL", NativeTypes.INT32, false)}
);
- schemaReg = new SchemaRegistryImpl((v) -> schemaDesc, () -> INITIAL_SCHEMA_VERSION, schemaDesc);
+ schemaReg = new SchemaRegistryImpl((v) -> completedFuture(schemaDesc), () -> INITIAL_SCHEMA_VERSION, schemaDesc);
when(tbl.name()).thenReturn("TEST");
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0cad686300..2d9618294f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -18,17 +18,12 @@
package org.apache.ignite.internal.sql.engine.exec;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static org.apache.ignite.internal.schema.configuration.storage.UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine.ENGINE_NAME;
import static org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfigurationSchema.DEFAULT_DATA_REGION_NAME;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,12 +32,10 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -61,36 +54,26 @@ import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.storage.impl.LocalLogStorageFactory;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
-import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
-import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
-import org.apache.ignite.internal.storage.impl.TestStorageEngine;
import org.apache.ignite.internal.storage.impl.schema.TestDataStorageConfigurationSchema;
-import org.apache.ignite.internal.storage.impl.schema.TestDataStorageView;
import org.apache.ignite.internal.storage.rocksdb.RocksDbDataStorageModule;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageConfigurationSchema;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbDataStorageView;
import org.apache.ignite.internal.storage.rocksdb.configuration.schema.RocksDbStorageEngineConfiguration;
-import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.ColumnAlreadyExistsException;
-import org.apache.ignite.lang.ColumnNotFoundException;
import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
@@ -106,7 +89,6 @@ import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -217,6 +199,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
when(rm.messagingService()).thenReturn(mock(MessagingService.class));
when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+ mockMetastore();
+
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
function.apply(0L).join();
@@ -241,7 +225,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
dataStorageManager.start();
- schemaManager = new SchemaManager(revisionUpdater, tblsCfg);
+ schemaManager = new SchemaManager(revisionUpdater, tblsCfg, msm);
schemaManager.start();
@@ -274,33 +258,20 @@ public class MockedStructuresTest extends IgniteAbstractTest {
.get(1, TimeUnit.SECONDS);
}
- /**
- * Checks inner transactions are initialized correctly.
- */
- @Test
- public void testInnerTxInitiated() throws Exception {
- SessionId sesId = queryProc.createSession(1000, PropertiesHolder.fromMap(Map.of()));
-
- InternalTransaction tx = mock(InternalTransaction.class);
-
- when(tm.begin()).thenReturn(tx);
-
- String sql = "CREATE TABLE TEST (c1 int PRIMARY KEY, c2 varbinary(255))";
-
- CompletableFuture<AsyncSqlCursor<List<Object>>> f = queryProc.querySingleAsync(sesId, QueryContext.of(), sql);
-
- AsyncSqlCursor<List<Object>> asyncRes = f.get();
-
- asyncRes.closeAsync();
-
- verify(tm, never()).begin();
+ /** Dummy metastore activity mock. */
+ private void mockMetastore() throws Exception {
+ Cursor cursorMocked = mock(Cursor.class);
+ Iterator itMock = mock(Iterator.class);
+ when(itMock.hasNext()).thenReturn(false);
+ when(msm.prefix(any())).thenReturn(cursorMocked);
+ when(cursorMocked.iterator()).thenReturn(itMock);
}
/**
* Tests create a table through public API.
*/
@Test
- public void testCreateTable() {
+ public void testCreateTable() throws Exception {
SqlQueryProcessor finalQueryProc = queryProc;
String curMethodName = getCurrentMethodName();
@@ -337,21 +308,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql3).get(0)));
}
- /**
- * Tests create a table with multiple pk through public API.
- */
- @Test
- public void testCreateTableMultiplePk() {
- String curMethodName = getCurrentMethodName();
-
- String newTblSql = String.format("CREATE TABLE %s (c1 int, c2 int NOT NULL DEFAULT 1, c3 int, primary key(c1, c2))", curMethodName);
-
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
- assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
- .equalsIgnoreCase(curMethodName)));
- }
-
/**
* Tests create and drop table through public API.
*/
@@ -384,197 +340,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
.equalsIgnoreCase("PUBLIC." + curMethodName)));
}
- /**
- * Tests alter and drop columns through public API.
- */
- @Test
- public void testAlterAndDropSimpleCase() {
- SqlQueryProcessor finalQueryProc = queryProc;
-
- String curMethodName = getCurrentMethodName();
-
- String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName);
-
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
- String alterCmd = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName);
-
- readFirst(queryProc.queryAsync("PUBLIC", alterCmd));
-
- String alterCmd1 = String.format("ALTER TABLE %s ADD COLUMN c5 int NOT NULL DEFAULT 1", curMethodName);
-
- readFirst(queryProc.queryAsync("PUBLIC", alterCmd1));
-
- assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
-
- String alterCmdNoTbl = String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "_notExist");
-
- assertThrows(TableNotFoundException.class, () -> readFirst(queryProc.queryAsync("PUBLIC", alterCmdNoTbl)));
-
- String alterIfExistsCmd = String.format("ALTER TABLE IF EXISTS %s ADD COLUMN (c3 varchar, c4 int)", curMethodName + "NotExist");
-
- readFirst(queryProc.queryAsync("PUBLIC", alterIfExistsCmd));
-
- assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC", alterCmd)));
-
- readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS c3 varchar", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c3", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS c3", curMethodName)));
-
- assertThrows(ColumnNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
- String.format("ALTER TABLE %s DROP COLUMN (c3, c4)", curMethodName))));
-
- assertThrows(IgniteException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
- String.format("ALTER TABLE %s DROP COLUMN c1", curMethodName))));
- }
-
- /**
- * Tests alter add multiple columns through public API.
- */
- @Test
- public void testAlterColumnsAddBatch() {
- String curMethodName = getCurrentMethodName();
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varchar(255))", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN (c3 varchar, c4 varchar)", curMethodName)));
-
- readFirst(queryProc
- .queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar)", curMethodName)));
-
- readFirst(
- queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s ADD COLUMN IF NOT EXISTS (c3 varchar, c4 varchar, c5 varchar)",
- curMethodName)));
-
- SqlQueryProcessor finalQueryProc = queryProc;
-
- assertThrows(ColumnAlreadyExistsException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
- String.format("ALTER TABLE %s ADD COLUMN (c5 varchar)", curMethodName))));
- }
-
- /**
- * Tests alter drop multiple columns through public API.
- */
- @Test
- public void testAlterColumnsDropBatch() {
- String curMethodName = getCurrentMethodName();
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s "
- + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 varchar, c5 varchar)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN IF EXISTS (c3, c4, c5)", curMethodName)));
-
- SqlQueryProcessor finalQueryProc = queryProc;
-
- assertThrows(ColumnNotFoundException.class, () -> readFirst(finalQueryProc.queryAsync("PUBLIC",
- String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName))));
- }
-
- /**
- * Tests create a table through public API.
- */
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16032")
- @Test
- public void testCreateDropIndex() {
- SqlQueryProcessor finalQueryProc = queryProc;
-
- String curMethodName = getCurrentMethodName();
-
- String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", curMethodName);
-
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
-
- assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
- .equalsIgnoreCase("PUBLIC." + curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index1 ON %s (c1)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index2 ON %s (c1)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c2)", curMethodName)));
-
- assertThrows(IndexAlreadyExistsException.class, () ->
- readFirst(finalQueryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index3 ON %s (c1)", curMethodName))));
-
- assertThrows(IgniteException.class, () ->
- readFirst(finalQueryProc
- .queryAsync("PUBLIC", String.format("CREATE INDEX index_3 ON %s (c1)", curMethodName + "_nonExist"))));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName)));
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("DROP INDEX IF EXISTS index4 ON %s", curMethodName)));
- }
-
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-17197")
- @Test
- void createTableWithEngine() throws Exception {
- String method = getCurrentMethodName();
-
- // Without engine.
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 0)
- )));
-
- assertThat(tableView(method + 0).dataStorage(), instanceOf(RocksDbDataStorageView.class));
-
- // With existing engine.
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format(
- "CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s",
- method + 1,
- TestStorageEngine.ENGINE_NAME
- )
- )));
-
- assertThat(tableView(method + 1).dataStorage(), instanceOf(TestDataStorageView.class));
-
- // With existing engine in mixed case
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 2, "\"RocksDb\"")
- )));
-
- assertThat(tableView(method + 2).dataStorage(), instanceOf(RocksDbDataStorageView.class));
-
- IgniteException exception = assertThrows(
- IgniteException.class,
- () -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) engine %s", method + 3, method)
- ))
- );
-
- assertThat(exception.getMessage(), startsWith("Unexpected data storage engine"));
-
- tblsCfg.defaultDataStorage().update(UNKNOWN_DATA_STORAGE).get(1, TimeUnit.SECONDS);
-
- exception = assertThrows(
- IgniteException.class,
- () -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255))", method + 4)
- ))
- );
-
- assertThat(exception.getMessage(), startsWith("Default data storage is not defined"));
- }
-
@Test
void createTableWithTableOptions() {
String method = getCurrentMethodName();
@@ -660,34 +425,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
);
}
- /**
- * Tests that schema that is not applied yet is accessible after some time.
- *
- * @throws Exception If test has failed.
- */
- @Test
- public void testSchemaForTheFutureUpdate() throws Exception {
- String curMethodName = getCurrentMethodName();
-
- readFirst(queryProc.queryAsync("PUBLIC", String.format("CREATE TABLE %s "
- + "(c1 int PRIMARY KEY, c2 decimal(10), c3 varchar, c4 varchar, c5 varchar)", curMethodName)));
-
- SchemaRegistry schemaRegistry = (((TableImpl) tblManager.tables().get(0)).schemaView());
-
- runAsync(() -> {
- Thread.sleep(3000);
- readFirst(queryProc.queryAsync("PUBLIC", String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
- });
-
- int lastSchemaVersion = schemaRegistry.lastSchemaVersion();
-
- //Ensure that we can get schema for the future update
- assertTrue(waitForCondition(
- () -> schemaRegistry.schema(lastSchemaVersion + 1).version() == lastSchemaVersion + 1,
- 5000
- ));
- }
-
// todo copy-paste from TableManagerTest will be removed after https://issues.apache.org/jira/browse/IGNITE-16050
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 35c33e3372..a9d3629fd7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -73,7 +73,6 @@ import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.hlc.HybridClock;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.baseline.BaselineManager;
@@ -94,9 +93,7 @@ import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.storage.impl.LogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
@@ -106,7 +103,6 @@ import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
-import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
@@ -188,7 +184,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/**
* If this property is set to {@code true} then an attempt to get the configuration property directly from the meta storage will be
* skipped, and the local property will be returned.
- * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metostorage,
+ * TODO: IGNITE-16774 This property and overall approach, access configuration directly through the Metastorage,
* TODO: will be removed after fix of the issue.
*/
private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY");
@@ -1148,34 +1144,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
extConfCh.changeTableId(intTableId);
+ extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
+
tableCreateFuts.put(extConfCh.id(), tblFut);
// Affinity assignments calculation.
extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
- baselineMgr.nodes(),
- tableChange.partitions(),
- tableChange.replicas(),
- HashSet::new)))
- // Table schema preparation.
- .changeSchemas(schemasCh -> schemasCh.create(
- String.valueOf(INITIAL_SCHEMA_VERSION),
- schemaCh -> {
- SchemaDescriptor schemaDesc;
-
- //TODO IGNITE-15747 Remove try-catch and force configuration
- // validation here to ensure a valid configuration passed to
- // prepareSchemaDescriptor() method.
- try {
- schemaDesc = SchemaUtils.prepareSchemaDescriptor(
- ((ExtendedTableView) tableChange).schemas().size(),
- tableChange);
- } catch (IllegalArgumentException ex) {
- throw new ConfigurationValidationException(ex.getMessage());
- }
-
- schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
- }
- ));
+ baselineMgr.nodes(),
+ tableChange.partitions(),
+ tableChange.replicas(),
+ HashSet::new)));
});
})).exceptionally(t -> {
Throwable ex = getRootCause(t);
@@ -1210,7 +1188,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* </ul>
* @see TableNotFoundException
*/
- public CompletableFuture<Void> alterTableAsync(String name, Consumer<TableChange> tableChange) {
+ public CompletableFuture<Void> alterTableAsync(String name, Function<TableChange, Boolean> tableChange) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
@@ -1221,58 +1199,28 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
}
- /** See {@link #alterTableAsync(String, Consumer)} for details. */
- private CompletableFuture<Void> alterTableAsyncInternal(String name, Consumer<TableChange> tableChange) {
+ /** See {@link #alterTableAsync(String, Function)} for details. */
+ private CompletableFuture<Void> alterTableAsyncInternal(String name, Function<TableChange, Boolean> tableChange) {
CompletableFuture<Void> tblFut = new CompletableFuture<>();
tableAsync(name).thenAccept(tbl -> {
if (tbl == null) {
tblFut.completeExceptionally(new TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
} else {
- TableImpl tblImpl = (TableImpl) tbl;
-
tablesCfg.tables().change(ch -> {
if (ch.get(name) == null) {
throw new TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
}
ch.update(name, tblCh -> {
- tableChange.accept(tblCh);
-
- ((ExtendedTableChange) tblCh).changeSchemas(schemasCh ->
- schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
- ExtendedTableView currTableView = (ExtendedTableView) tablesCfg.tables().get(name).value();
-
- SchemaDescriptor descriptor;
-
- //TODO IGNITE-15747 Remove try-catch and force configuration validation
- // here to ensure a valid configuration passed to prepareSchemaDescriptor() method.
- try {
- descriptor = SchemaUtils.prepareSchemaDescriptor(
- ((ExtendedTableView) tblCh).schemas().size(),
- tblCh);
-
- descriptor.columnMapping(SchemaUtils.columnMapper(
- tblImpl.schemaView().schema(currTableView.schemas().size()),
- currTableView.columns(),
- descriptor,
- tblCh.columns()));
- } catch (IllegalArgumentException ex) {
- // Convert unexpected exceptions here,
- // because validation actually happens later,
- // when bulk configuration update is applied.
- ConfigurationValidationException e =
- new ConfigurationValidationException(ex.getMessage());
-
- e.addSuppressed(ex);
-
- throw e;
- }
-
- schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
- }));
- }
- );
+ if (!tableChange.apply(tblCh)) {
+ return;
+ }
+
+ ExtendedTableChange exTblChange = (ExtendedTableChange) tblCh;
+
+ exTblChange.changeSchemaId(exTblChange.schemaId() + 1);
+ });
}).whenComplete((res, t) -> {
if (t != null) {
Throwable ex = getRootCause(t);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1d02422b16..92ed4d5170 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -48,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -82,11 +83,9 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaUtils;
import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
-import org.apache.ignite.internal.schema.configuration.ExtendedTableView;
import org.apache.ignite.internal.schema.configuration.TableChange;
import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
-import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.testutils.builder.SchemaBuilders;
import org.apache.ignite.internal.schema.testutils.definition.ColumnType;
@@ -104,6 +103,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -282,6 +282,8 @@ public class TableManagerTest extends IgniteAbstractTest {
SchemaBuilders.column("val", ColumnType.INT64).asNullable(true).build()
).withPrimaryKey("key").build();
+ mockMetastore();
+
tblsCfg.tables().change(tablesChange -> {
tablesChange.create(scmTbl.name(), tableChange -> {
(SchemaConfigurationConverter.convert(scmTbl, tableChange))
@@ -298,17 +300,7 @@ public class TableManagerTest extends IgniteAbstractTest {
assignment.add(new HashSet<>(Collections.singleton(node)));
}
- extConfCh.changeAssignments(ByteUtils.toBytes(assignment))
- .changeSchemas(schemasCh -> schemasCh.create(
- String.valueOf(1),
- schemaCh -> {
- SchemaDescriptor schemaDesc = SchemaUtils.prepareSchemaDescriptor(
- ((ExtendedTableView) tableChange).schemas().size(),
- tableChange);
-
- schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(schemaDesc));
- }
- ));
+ extConfCh.changeAssignments(ByteUtils.toBytes(assignment)).changeSchemaId(1);
});
}).join();
@@ -383,15 +375,18 @@ public class TableManagerTest extends IgniteAbstractTest {
.changeReplicas(REPLICAS)
.changePartitions(PARTITIONS);
- final Consumer<TableChange> addColumnChange = (TableChange change) ->
- change.changeColumns(cols -> {
- int colIdx = change.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt() + 1;
+ final Function<TableChange, Boolean> addColumnChange = (TableChange change) -> {
+ change.changeColumns(cols -> {
+ int colIdx = change.columns().namedListKeys().stream().mapToInt(Integer::parseInt).max().getAsInt() + 1;
- cols.create(String.valueOf(colIdx),
- colChg -> SchemaConfigurationConverter.convert(SchemaBuilders.column("name", ColumnType.string()).build(),
- colChg));
+ cols.create(String.valueOf(colIdx),
+ colChg -> SchemaConfigurationConverter.convert(SchemaBuilders.column("name", ColumnType.string()).build(),
+ colChg));
- });
+ });
+
+ return true;
+ };
TableManager igniteTables = tableManager;
@@ -510,6 +505,8 @@ public class TableManagerTest extends IgniteAbstractTest {
.withPrimaryKey("key")
.build();
+ when(msm.put(any(), any())).thenReturn(completedFuture(null));
+
Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
assertNotNull(table);
@@ -538,6 +535,15 @@ public class TableManagerTest extends IgniteAbstractTest {
return mockManagersAndCreateTableWithDelay(tableDefinition, tblManagerFut, null);
}
+ /** Dummy metastore activity mock. */
+ private void mockMetastore() throws Exception {
+ Cursor cursorMocked = mock(Cursor.class);
+ Iterator itMock = mock(Iterator.class);
+ when(itMock.hasNext()).thenReturn(false);
+ when(msm.prefix(any())).thenReturn(cursorMocked);
+ when(cursorMocked.iterator()).thenReturn(itMock);
+ }
+
/**
* Instantiates a table and prepares Table manager. When the latch would open, the method completes.
*
@@ -618,6 +624,8 @@ public class TableManagerTest extends IgniteAbstractTest {
.changePartitions(PARTITIONS)
);
+ mockMetastore();
+
assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
TableImpl tbl2 = (TableImpl) tbl2Fut.get();
@@ -758,7 +766,7 @@ public class TableManagerTest extends IgniteAbstractTest {
dsm = createDataStorageManager(configRegistry, workDir, rocksDbEngineConfig),
workDir,
msm,
- sm = new SchemaManager(revisionUpdater, tblsCfg),
+ sm = new SchemaManager(revisionUpdater, tblsCfg, msm),
budgetView -> new LocalLogStorageFactory(),
null
);