You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2024/02/12 06:43:03 UTC

(ignite-3) branch main updated: IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c8254bc147 IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)
c8254bc147 is described below

commit c8254bc147c0168143db59864276c5661ac51304
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Mon Feb 12 10:42:57 2024 +0400

    IGNITE-21316 Add manual schema sync to ItRebalanceDistributedTest (#3201)
---
 .../schemasync/ItSchemaSyncMultiNodeTest.java      | 204 +++++++++++++++++++++
 .../internal/ClusterPerTestIntegrationTest.java    |   6 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  13 +-
 3 files changed, 221 insertions(+), 2 deletions(-)

diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java
new file mode 100644
index 0000000000..952786586b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncMultiNodeTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schemasync;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.test.WatchListenerInhibitor.metastorageEventsInhibitor;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests about basic Schema Synchronization properties that should be tested using a few Ignite node.
+ */
+@SuppressWarnings("resource")
+class ItSchemaSyncMultiNodeTest extends ClusterPerTestIntegrationTest {
+    private static final int NODES_TO_START = 2;
+
+    private static final int NODE_0_INDEX = 0;
+    private static final int NODE_1_INDEX = 1;
+
+    private static final String TABLE_NAME = "test";
+
+    @Override
+    protected int initialNodes() {
+        return NODES_TO_START;
+    }
+
+    @Override
+    protected int[] cmgMetastoreNodes() {
+        return new int[]{ 0 };
+    }
+
+    /**
+     * Makes sure that, if a DDL is executed on one node, its future is waited out, then a SQL DML is executed
+     * on another node of the cluster, that DML sees the results of the DDL (i.e. it cannot see the old schema).
+     */
+    @Test
+    void sqlDmlAfterDdlOnAnotherNodeSeesDdlResults() {
+        createTableAtNode(NODE_0_INDEX);
+
+        assertDoesNotThrow(() -> executeSql(NODE_1_INDEX, "INSERT INTO " + TABLE_NAME + " (id, val) VALUES (1, 'one')"));
+
+        assertThat(executeSql(NODE_1_INDEX, "SELECT val FROM " + TABLE_NAME + " WHERE id = 1"), is(List.of(List.of("one"))));
+    }
+
+    /**
+     * Makes sure that, if a DDL is executed on one node, its future is waited out, then a SQL DML is executed
+     * on another node of the cluster, that DML sees the results of the DDL (i.e. it cannot see the old schema).
+     *
+     * <p>This particular scenario uses metastorage inhibiting to make sure that schema sync is not missed.
+     */
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21512")
+    void sqlDmlAfterDdlOnAnotherNodeSeesDdlResultsWithInhibitor() {
+        WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+        inhibitorOnNode1.startInhibit();
+
+        try {
+            createTableAtNode(NODE_0_INDEX);
+
+            CompletableFuture<?> insertFuture = runAsync(
+                    () -> executeSql(NODE_1_INDEX, "INSERT INTO " + TABLE_NAME + " (id, val) VALUES (1, 'one')")
+            );
+
+            assertThat(insertFuture, willTimeoutIn(100, MILLISECONDS));
+
+            inhibitorOnNode1.stopInhibit();
+
+            assertThat(insertFuture, willCompleteSuccessfully());
+
+            assertThat(executeSql(NODE_1_INDEX, "SELECT val FROM " + TABLE_NAME + " WHERE id = 1"), is(List.of(List.of("one"))));
+        } finally {
+            inhibitorOnNode1.stopInhibit();
+        }
+    }
+
+    private void createTableAtNode(int nodeIndex) {
+        cluster.doInSession(nodeIndex, session -> {
+            executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar)", session);
+        });
+    }
+
+    /**
+     * Makes sure that, if a DDL is executed on one node, its future is waited out, then a KV operation is executed
+     * on another node of the cluster, that KV operation sees the results of the DDL (i.e. it cannot see the old schema).
+     */
+    @Test
+    void kvDmlAfterDdlOnAnotherNodeSeesDdlResults() {
+        createTableAtNode(NODE_0_INDEX);
+
+        Table tableAtNode1 = node(NODE_1_INDEX).tables().table(TABLE_NAME);
+
+        addColumnAtNode(NODE_0_INDEX);
+
+        Tuple key = Tuple.create().set("id", 1);
+        assertDoesNotThrow(() -> tableAtNode1.keyValueView().put(null, key, Tuple.create().set("added", 42)));
+
+        Tuple retrievedTuple = tableAtNode1.keyValueView().get(null, key);
+        assertThat(retrievedTuple, is(notNullValue()));
+        assertThat(retrievedTuple.value("added"), is(42));
+    }
+
+    /**
+     * Makes sure that, if a DDL is executed on one node, its future is waited out, then a KV operation is executed
+     * on another node of the cluster, that KV operation sees the results of the DDL (i.e. it cannot see the old schema).
+     *
+     * <p>This particular scenario uses metastorage inhibiting to make sure that schema sync is not missed.
+     */
+    @Test
+    void kvDmlAfterDdlOnAnotherNodeSeesDdlResultsWithInhibitor() {
+        createTableAtNode(NODE_0_INDEX);
+
+        Table tableAtNode1 = node(NODE_1_INDEX).tables().table(TABLE_NAME);
+        KeyValueView<Tuple, Tuple> kvViewAtNode1 = tableAtNode1.keyValueView();
+
+        Tuple key = Tuple.create().set("id", 1);
+
+        WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+        inhibitorOnNode1.startInhibit();
+
+        try {
+            addColumnAtNode(NODE_0_INDEX);
+
+            CompletableFuture<?> putFuture = kvViewAtNode1.putAsync(null, key, Tuple.create().set("added", 42));
+
+            assertThat(putFuture, willTimeoutIn(100, MILLISECONDS));
+
+            inhibitorOnNode1.stopInhibit();
+
+            assertThat(putFuture, willCompleteSuccessfully());
+        } finally {
+            inhibitorOnNode1.stopInhibit();
+        }
+
+        Tuple retrievedTuple = kvViewAtNode1.get(null, key);
+        assertThat(retrievedTuple, is(notNullValue()));
+        assertThat(retrievedTuple.value("added"), is(42));
+    }
+
+    private void addColumnAtNode(int nodeIndex) {
+        cluster.doInSession(nodeIndex, session -> {
+            executeUpdate("ALTER TABLE " + TABLE_NAME + " ADD COLUMN added int", session);
+        });
+    }
+
+    /**
+     * Makes sure that, if a DDL is executed on one node, its future is waited out, then another DDL is executed
+     * on another node of the cluster, that second DDL sees the results of the first DDL (i.e. it cannot see the old schema).
+     */
+    @Test
+    void ddlAfterDdlOnAnotherNodeSeesFirstDdlResults() {
+        WatchListenerInhibitor inhibitorOnNode1 = metastorageEventsInhibitor(node(NODE_1_INDEX));
+        inhibitorOnNode1.startInhibit();
+
+        try {
+            cluster.doInSession(NODE_0_INDEX, session -> {
+                executeUpdate("CREATE ZONE test_zone", session);
+            });
+
+            CompletableFuture<Void> tableCreationFuture = runAsync(() -> cluster.doInSession(NODE_1_INDEX, session -> {
+                executeUpdate("CREATE TABLE " + TABLE_NAME + " (id int PRIMARY KEY, val varchar) WITH primary_zone='TEST_ZONE'", session);
+            }));
+
+            assertThat(tableCreationFuture, willTimeoutIn(1, SECONDS));
+
+            inhibitorOnNode1.stopInhibit();
+
+            assertThat(tableCreationFuture, willCompleteSuccessfully());
+        } finally {
+            inhibitorOnNode1.stopInhibit();
+        }
+    }
+}
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 300762d0f6..e21b6f4ee7 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -209,7 +209,11 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
     }
 
     protected final List<List<Object>> executeSql(String sql, Object... args) {
-        IgniteImpl ignite = node(0);
+        return executeSql(0, sql, args);
+    }
+
+    protected final List<List<Object>> executeSql(int nodeIndex, String sql, Object... args) {
+        IgniteImpl ignite = node(nodeIndex);
 
         return ClusterPerClassIntegrationTest.sql(ignite, null, sql, args);
     }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index d3ac1d1001..cf5314a741 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -167,6 +167,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableMessageGroup;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -894,6 +895,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
 
         private final CatalogManager catalogManager;
 
+        private final SchemaSyncService schemaSyncService;
+
         private final ClockWaiter clockWaiter;
 
         private final List<IgniteComponent> nodeComponents = new CopyOnWriteArrayList<>();
@@ -1099,7 +1102,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
 
             schemaManager = new SchemaManager(registry, catalogManager, metaStorageManager);
 
-            var schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier);
+            schemaSyncService = new SchemaSyncServiceImpl(metaStorageManager.clusterTime(), delayDurationMsSupplier);
 
             distributionZoneManager = new DistributionZoneManager(
                     name,
@@ -1178,6 +1181,10 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
             indexManager = new IndexManager(schemaManager, tableManager, catalogManager, metaStorageManager, registry);
         }
 
+        private void waitForMetadataCompletenessAtNow() {
+            assertThat(schemaSyncService.waitForMetadataCompleteness(hybridClock.now()), willCompleteSuccessfully());
+        }
+
         /**
          * Starts the created components.
          */
@@ -1407,10 +1414,14 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
     }
 
     private static void alterZone(Node node, String zoneName, int replicas) {
+        node.waitForMetadataCompletenessAtNow();
+
         DistributionZonesTestUtil.alterZone(node.catalogManager, zoneName, replicas);
     }
 
     private static void createTable(Node node, String zoneName, String tableName) {
+        node.waitForMetadataCompletenessAtNow();
+
         TableTestUtils.createTable(
                 node.catalogManager,
                 DEFAULT_SCHEMA_NAME,