You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/28 11:14:26 UTC
[shardingsphere] branch master updated: Add record exist check at no unique key migration E2E (#26653)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9228d184d16 Add record exist check at no unique key migration E2E (#26653)
9228d184d16 is described below
commit 9228d184d16312dc9a48914671db6b16ba3a998b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jun 28 19:14:18 2023 +0800
Add record exist check at no unique key migration E2E (#26653)
* Incr max wait time at register storage unit at E2E
* Add record exist check at no unique key migration
* Remove retry at PipelineContainerComposer.queryForListWithLog
* Use AwaitTimeoutUtil
---
.../pipeline/cases/PipelineContainerComposer.java | 21 ++++---------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 8 +++--
.../cases/migration/AbstractMigrationE2EIT.java | 9 ++++--
.../primarykey/IndexesMigrationE2EIT.java | 9 +++---
.../e2e/data/pipeline/util/AwaitTimeoutUtil.java | 34 ++++++++++++++++++++++
5 files changed, 55 insertions(+), 26 deletions(-)
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index c1c5349bddf..dbfee4598a0 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -404,20 +404,12 @@ public final class PipelineContainerComposer implements AutoCloseable {
*/
public List<Map<String, Object>> queryForListWithLog(final DataSource dataSource, final String sql) {
log.info("Query SQL: {}", sql);
- int retryNumber = 0;
- while (retryNumber <= 3) {
- try (Connection connection = dataSource.getConnection()) {
- ResultSet resultSet = connection.createStatement().executeQuery(sql);
- return transformResultSetToList(resultSet);
- // CHECKSTYLE:OFF
- } catch (final SQLException | RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("Data access error, sql: {}.", sql, ex);
- }
- Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> true);
- retryNumber++;
+ try (Connection connection = dataSource.getConnection()) {
+ ResultSet resultSet = connection.createStatement().executeQuery(sql);
+ return transformResultSetToList(resultSet);
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
}
- throw new RuntimeException("Can not get result from proxy.");
}
/**
@@ -547,10 +539,9 @@ public final class PipelineContainerComposer implements AutoCloseable {
* Generate ShardingSphere data source from proxy.
*
* @return ShardingSphere data source
- * @throws SQLException SQL exception
*/
// TODO proxy support for some fields still needs to be optimized, such as binary of MySQL, after these problems are optimized, Proxy dataSource can be used.
- public DataSource generateShardingSphereDataSourceFromProxy() throws SQLException {
+ public DataSource generateShardingSphereDataSourceFromProxy() {
Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
YamlRootConfiguration rootConfig = getYamlRootConfig();
ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), () -> new IllegalStateException("dataSources is null"));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 4aeacfff4e6..cd47ac95c63 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -49,6 +49,7 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtils;
import org.apache.shardingsphere.test.e2e.env.container.atomic.constants.ProxyContainerConstants;
import org.junit.jupiter.api.condition.EnabledIf;
@@ -105,8 +106,8 @@ class CDCE2EIT {
for (String each : Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> containerComposer.showStorageUnitsName()
- .containsAll(Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)));
+ Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> containerComposer.showStorageUnitsName().containsAll(Arrays.asList(PipelineContainerComposer.DS_0, PipelineContainerComposer.DS_1)));
createOrderTableRule(containerComposer);
try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
@@ -150,7 +151,8 @@ class CDCE2EIT {
}
private void createOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
- containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 2);
+ containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 0);
+ Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
}
private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException {
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 2750dc6850f..c6baade542a 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerC
import org.apache.shardingsphere.test.e2e.data.pipeline.command.MigrationDistSQLCommand;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.AwaitTimeoutUtil;
import org.apache.shardingsphere.test.e2e.env.container.atomic.util.DatabaseTypeUtils;
import org.awaitility.Awaitility;
import org.opengauss.util.PSQLException;
@@ -74,7 +75,8 @@ public abstract class AbstractMigrationE2EIT {
.replace("${ds3}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true))
.replace("${ds4}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true));
containerComposer.proxyExecuteWithLog(addTargetResource, 0);
- Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 3 == containerComposer.showStorageUnitsName().size());
+ Awaitility.await().ignoreExceptions().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+ .until(() -> 3 == containerComposer.showStorageUnitsName().size());
}
protected void createSourceSchema(final PipelineContainerComposer containerComposer, final String schemaName) throws SQLException {
@@ -102,7 +104,8 @@ public abstract class AbstractMigrationE2EIT {
protected void createTargetOrderTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(), 0);
- Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
+ Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
+ .until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
}
protected void createTargetOrderTableEncryptRule(final PipelineContainerComposer containerComposer) throws SQLException {
@@ -111,7 +114,7 @@ public abstract class AbstractMigrationE2EIT {
protected void createTargetOrderItemTableRule(final PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(), 0);
- Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(AwaitTimeoutUtil.getTimeout(containerComposer.getDatabaseType()), TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS)
.until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order_item").isEmpty());
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 5a61df80f40..d91a6e22541 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -91,13 +91,12 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
KeyGenerateAlgorithm keyGenerateAlgorithm = new UUIDKeyGenerateAlgorithm();
// TODO PostgreSQL update delete events not support if table without unique keys at increment task.
final Consumer<DataSource> incrementalTaskFn = dataSource -> {
- Object orderId = keyGenerateAlgorithm.generateKey();
- insertOneOrder(containerComposer, orderId);
if (containerComposer.getDatabaseType() instanceof MySQLDatabaseType) {
- updateOneOrder(containerComposer, orderId, "updated");
- deleteOneOrder(containerComposer, orderId, "updated");
- insertOneOrder(containerComposer, keyGenerateAlgorithm.generateKey());
+ doCreateUpdateDelete(containerComposer, keyGenerateAlgorithm.generateKey());
}
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ insertOneOrder(containerComposer, orderId);
+ containerComposer.assertOrderRecordExist(dataSource, "t_order", orderId);
};
assertMigrationSuccess(containerComposer, sql, "user_id", keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
new file mode 100644
index 00000000000..37f89ac7f8a
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/AwaitTimeoutUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.util;
+
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+
+public final class AwaitTimeoutUtil {
+
+ /**
+ * Get timeout, openGauss metadata reload slowly need special handling.
+ *
+ * @param databaseType database type.
+ * @return timeout
+ */
+ public static long getTimeout(final DatabaseType databaseType) {
+ return databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
+ }
+}