You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/28 11:14:26 UTC

[shardingsphere] branch master updated: Add record exist check at no unique key migration E2E (#26653)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9228d184d16 Add record exist check at no unique key migration E2E  (#26653)
9228d184d16 is described below

commit 9228d184d16312dc9a48914671db6b16ba3a998b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jun 28 19:14:18 2023 +0800

    Add record exist check at no unique key migration E2E  (#26653)
    
    * Incr max wait time at register storage unit at E2E
    
    * Add record exist check at no unique key migration
    
    * Remove retry at PipelineContainerComposer.queryForListWithLog
    
    * Use AwaitTimeoutUtil
---
 .../pipeline/cases/PipelineContainerComposer.java  | 21 ++++---------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  8 +++--
 .../cases/migration/AbstractMigrationE2EIT.java    |  9 ++++--
 .../primarykey/IndexesMigrationE2EIT.java          |  9 +++---
 .../e2e/data/pipeline/util/AwaitTimeoutUtil.java   | 34 ++++++++++++++++++++++
 5 files changed, 55 insertions(+), 26 deletions(-)

diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index c1c5349bddf..dbfee4598a0 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -404,20 +404,12 @@ public final class PipelineContainerComposer implements AutoCloseable {
      */
     public List<Map<String, Object>> queryForListWithLog(final DataSource dataSource, final String sql) {
         log.info("Query SQL: {}", sql);
-        int retryNumber = 0;
-        while (retryNumber <= 3) {
-            try (Connection connection = dataSource.getConnection()) {
-                ResultSet resultSet = connection.createStatement().executeQuery(sql);
-                return transformResultSetToList(resultSet);
-                // CHECKSTYLE:OFF
-            } catch (final SQLException | RuntimeException ex) {
-                // CHECKSTYLE:ON
-                log.error("Data access error, sql: {}.", sql, ex);
-            }
-            Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
-            retryNumber++;
+        try (Connection connection = dataSource.getConnection()) {
+            ResultSet resultSet = connection.createStatement().executeQuery(sql);
+            return transformResultSetToList(resultSet);
+        } catch (final SQLException ex) {
+            throw new RuntimeException(ex);
         }
-        throw new RuntimeException("Can not get result from proxy.");
     }
     
     /**
@@ -547,10 +539,9 @@ public final class PipelineContainerComposer implements AutoCloseable {
      * Generate ShardingSphere data source from proxy.
      *
      * @return ShardingSphere data source
-     * @throws SQLException SQL exception
      */
     // TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
-    public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
+    public DataSource generateShardingSphereDataSourceFromProxy() {
         Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
         ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), () -> new IllegalStateException("dataSources is null"));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 4aeacfff4e6..cd47ac95c63 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -49,6 +49,7 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
 import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtils;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -105,8 +106,8 @@ class CDCE2EIT {
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
                 containerComposer.registerStorageUnit(each);
             }
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> containerComposer.showStorageUnitsName()
-                    .containsAll(Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)));
+            Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+                    .until(() -> containerComposer.showStorageUnitsName().containsAll(Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)));
             createOrderTableRule(containerComposer);
             try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
                 initSchemaAndTable(containerComposer, connection, 3);
@@ -150,7 +151,8 @@ class CDCE2EIT {
     }
     
     private void createOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
+        containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 0);
+        Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
     }
     
     private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 2750dc6850f..c6baade542a 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerC
 import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
 import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
 import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtils;
 import org.awaitility.Awaitility;
 import org.opengauss.util.PSQLException;
@@ -74,7 +75,8 @@ public abstract class AbstractMigrationE2EIT {
                 .replace("${ds3}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true))
                 .replace("${ds4}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true));
         containerComposer.proxyExecuteWithLog(addTargetResource, 0);
-        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 3 == containerComposer.showStorageUnitsName().size());
+        Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+                .until(() -> 3 == containerComposer.showStorageUnitsName().size());
     }
     
     protected void createSourceSchema(final PipelineContainerComposer containerComposer, final String schemaName) throws SQLException {
@@ -102,7 +104,8 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
         containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
+        Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+                .until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
     }
     
     protected void createTargetOrderTableEncryptRule(final PipelineContainerComposer containerComposer) throws SQLException {
@@ -111,7 +114,7 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderItemTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
         containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+        Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
                 .until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order_item").isEmpty());
     }
     
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 5a61df80f40..d91a6e22541 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -91,13 +91,12 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
             // TODO PostgreSQL update delete events not support if table without unique keys at increment task.
             final Consumer<DataSource> incrementalTaskFn = dataSource -> {
-                Object orderId = keyGenerateAlgorithm.generateKey();
-                insertOneOrder(containerComposer, orderId);
                 if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) {
-                    updateOneOrder(containerComposer, orderId, "updated");
-                    deleteOneOrder(containerComposer, orderId, "updated");
-                    insertOneOrder(containerComposer, keyGenerateAlgorithm.generateKey());
+                    doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
                 }
+                Object orderId = keyGenerateAlgorithm.generateKey();
+                insertOneOrder(containerComposer, orderId);
+                containerComposer.assertOrderRecordExist(dataSource, "t_order", orderId);
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
new file mode 100644
index 00000000000..37f89ac7f8a
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.util;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+
+public final class AwaitTimeoutUtil {
+    
+    /**
+     * Get timeout, openGauss metadata reload slowly need special handling.
+     *
+     * @param databaseType database type.
+     * @return timeout
+     */
+    public static long getTimeout(final DatabaseType databaseType) {
+        return databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
+    }
+}