You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/12 06:43:03 UTC
(ignite-3) branch main updated: IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c8254bc147 IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)
c8254bc147 is described below
commit c8254bc147c0168143db59864276c5661ac51304
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Feb 12 10:42:57 2024 +0400
IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)
---
.../schemasync/ItSchemaSyncMultiNodeTest.java | 204 +++++++++++++++++++++
.../internal/ClusterPerTestIntegrationTest.java | 6 +-
.../rebalance/ItRebalanceDistributedTest.java | 13 +-
3 files changed, 221 insertions(+), 2 deletions(-)
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java
new file mode 100644
index 0000000000..952786586b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests about basic Schema Synchronization properties that should be tested using a few Ignite node.
+ */
+@SuppressWarnings("resource")
+class ItSchemaSyncMultiNodeTest extends ClusterPerTestIntegrationTest {
+ private static final int NODES_TO_START = 2;
+
+ private static final int NODE_0_INDEX = 0;
+ private static final int NODE_1_INDEX = 1;
+
+ private static final String TABLE_NAME = "test";
+
+ @Override
+ protected int initialNodes() {
+ return NODES_TO_START;
+ }
+
+ @Override
+ protected int[] cmgMetastoreNodes() {
+ return new int[]{ 0 };
+ }
+
+ /**
+ * Makes sure that, if a DDL is executed on one node, its future is waited out, then a SQL DML is executed
+ * on another node of the cluster, that DML sees the results of the DDL (i.e. it cannot see the old schema).
+ */
+ @Test
+ void sqlDmlAfterDdlOnAnotherNodeSeesDdlResults() {
+ createTableAtNode(NODE_0_INDEX);
+
+ assertDoesNotThrow(() -> executeSql(NODE_1_INDEX, "INSERT INTO " + TABLE_NAME + " (id, val) VALUES (1, 'one')"));
+
+ assertThat(executeSql(NODE_1_INDEX, "SELECT val FROM " + TABLE_NAME + " WHERE id = 1"), is(List.of(List.of("one"))));
+ }
+
+ /**
+ * Makes sure that, if a DDL is executed on one node, its future is waited out, then a SQL DML is executed
+ * on another node of the cluster, that DML sees the results of the DDL (i.e. it cannot see the old schema).
+ *
+ * <p>This particular scenario uses metastorage inhibiting to make sure that schema sync is not missed.
+ */
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21512")
+ void sqlDmlAfterDdlOnAnotherNodeSeesDdlResultsWithInhibitor() {
+ WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+ inhibitorOnNode1.startInhibit();
+
+ try {
+ createTableAtNode(NODE_0_INDEX);
+
+ CompletableFuture<?> insertFuture = runAsync(
+ () -> executeSql(NODE_1_INDEX, "INSERT INTO " + TABLE_NAME + " (id, val) VALUES (1, 'one')")
+ );
+
+ assertThat(insertFuture, willTimeoutIn(100, MILLISECONDS));
+
+ inhibitorOnNode1.stopInhibit();
+
+ assertThat(insertFuture, willCompleteSuccessfully());
+
+ assertThat(executeSql(NODE_1_INDEX, "SELECT val FROM " + TABLE_NAME + " WHERE id = 1"), is(List.of(List.of("one"))));
+ } finally {
+ inhibitorOnNode1.stopInhibit();
+ }
+ }
+
+ private void createTableAtNode(int nodeIndex) {
+ cluster.doInSession(nodeIndex, session -> {
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+ });
+ }
+
+ /**
+ * Makes sure that, if a DDL is executed on one node, its future is waited out, then a KV operation is executed
+ * on another node of the cluster, that KV operation sees the results of the DDL (i.e. it cannot see the old schema).
+ */
+ @Test
+ void kvDmlAfterDdlOnAnotherNodeSeesDdlResults() {
+ createTableAtNode(NODE_0_INDEX);
+
+ Table tableAtNode1 = node(NODE_1_INDEX).tables().table(TABLE_NAME);
+
+ addColumnAtNode(NODE_0_INDEX);
+
+ Tuple key = Tuple.create().set("id", 1);
+ assertDoesNotThrow(() -> tableAtNode1.keyValueView().put(null, key, Tuple.create().set("added", 42)));
+
+ Tuple retrievedTuple = tableAtNode1.keyValueView().get(null, key);
+ assertThat(retrievedTuple, is(notNullValue()));
+ assertThat(retrievedTuple.value("added"), is(42));
+ }
+
+ /**
+ * Makes sure that, if a DDL is executed on one node, its future is waited out, then a KV operation is executed
+ * on another node of the cluster, that KV operation sees the results of the DDL (i.e. it cannot see the old schema).
+ *
+ * <p>This particular scenario uses metastorage inhibiting to make sure that schema sync is not missed.
+ */
+ @Test
+ void kvDmlAfterDdlOnAnotherNodeSeesDdlResultsWithInhibitor() {
+ createTableAtNode(NODE_0_INDEX);
+
+ Table tableAtNode1 = node(NODE_1_INDEX).tables().table(TABLE_NAME);
+ KeyValueView<Tuple, Tuple> kvViewAtNode1 = tableAtNode1.keyValueView();
+
+ Tuple key = Tuple.create().set("id", 1);
+
+ WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+ inhibitorOnNode1.startInhibit();
+
+ try {
+ addColumnAtNode(NODE_0_INDEX);
+
+ CompletableFuture<?> putFuture = kvViewAtNode1.putAsync(null, key, Tuple.create().set("added", 42));
+
+ assertThat(putFuture, willTimeoutIn(100, MILLISECONDS));
+
+ inhibitorOnNode1.stopInhibit();
+
+ assertThat(putFuture, willCompleteSuccessfully());
+ } finally {
+ inhibitorOnNode1.stopInhibit();
+ }
+
+ Tuple retrievedTuple = kvViewAtNode1.get(null, key);
+ assertThat(retrievedTuple, is(notNullValue()));
+ assertThat(retrievedTuple.value("added"), is(42));
+ }
+
+ private void addColumnAtNode(int nodeIndex) {
+ cluster.doInSession(nodeIndex, session -> {
+ executeUpdate("ALTER TABLE " + TABLE_NAME + " ADD COLUMN added int", session);
+ });
+ }
+
+ /**
+ * Makes sure that, if a DDL is executed on one node, its future is waited out, then another DDL is executed
+ * on another node of the cluster, that second DDL sees the results of the first DDL (i.e. it cannot see the old schema).
+ */
+ @Test
+ void ddlAfterDdlOnAnotherNodeSeesFirstDdlResults() {
+ WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+ inhibitorOnNode1.startInhibit();
+
+ try {
+ cluster.doInSession(NODE_0_INDEX, session -> {
+ executeUpdate("CREATE ZONE test_zone", session);
+ });
+
+ CompletableFuture<Void> tableCreationFuture = runAsync(() -> cluster.doInSession(NODE_1_INDEX, session -> {
+ executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar) WITH primary_zone='TEST_ZONE'", session);
+ }));
+
+ assertThat(tableCreationFuture, willTimeoutIn(1, SECONDS));
+
+ inhibitorOnNode1.stopInhibit();
+
+ assertThat(tableCreationFuture, willCompleteSuccessfully());
+ } finally {
+ inhibitorOnNode1.stopInhibit();
+ }
+ }
+}
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 300762d0f6..e21b6f4ee7 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -209,7 +209,11 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
}
protected final List<List<Object>> executeSql(String sql, Object... args) {
- IgniteImpl ignite = node(0);
+ return executeSql(0, sql, args);
+ }
+
+ protected final List<List<Object>> executeSql(int nodeIndex, String sql, Object... args) {
+ IgniteImpl ignite = node(nodeIndex);
return ClusterPerClassIntegrationTest.sql(ignite, null, sql, args);
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index d3ac1d1001..cf5314a741 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -167,6 +167,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -894,6 +895,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
private final CatalogManager catalogManager;
+ private final SchemaSyncService schemaSyncService;
+
private final ClockWaiter clockWaiter;
private final List<IgniteComponent> nodeComponents = new CopyOnWriteArrayList<>();
@@ -1099,7 +1102,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
schemaManager = new SchemaManager(registry, catalogManager, metaStorageManager);
- var schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier);
+ schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier);
distributionZoneManager = new DistributionZoneManager(
name,
@@ -1178,6 +1181,10 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
indexManager = new IndexManager(schemaManager, tableManager, catalogManager, metaStorageManager, registry);
}
+ private void waitForMetadataCompletenessAtNow() {
+ assertThat(schemaSyncService.waitForMetadataCompleteness(hybridClock.now()), willCompleteSuccessfully());
+ }
+
/**
* Starts the created components.
*/
@@ -1407,10 +1414,14 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
}
private static void alterZone(Node node, String zoneName, int replicas) {
+ node.waitForMetadataCompletenessAtNow();
+
DistributionZonesTestUtil.alterZone(node.catalogManager, zoneName, replicas);
}
private static void createTable(Node node, String zoneName, String tableName) {
+ node.waitForMetadataCompletenessAtNow();
+
TableTestUtils.createTable(
node.catalogManager,
DEFAULT_SCHEMA_NAME,