You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/02/19 12:45:02 UTC
[shardingsphere] branch master updated: Add faked MariaDBMigrationE2EIT (#24242)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c1ef99c4aba Add faked MariaDBMigrationE2EIT (#24242)
c1ef99c4aba is described below
commit c1ef99c4aba3d7e9d8f40fdeece3cb79eb66e305
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sun Feb 19 20:44:51 2023 +0800
Add faked MariaDBMigrationE2EIT (#24242)
---
.../pipeline/cases/base/PipelineBaseE2EIT.java | 22 +++--
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +-
.../primarykey/IndexesMigrationE2EIT.java | 30 +++----
.../primarykey/MariaDBMigrationE2EIT.java | 98 ++++++++++++++++++++++
.../api/impl/ConsistencyCheckJobAPITest.java | 4 +-
.../migration}/api/impl/MigrationJobAPITest.java | 15 +++-
6 files changed, 138 insertions(+), 33 deletions(-)
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 8cdfb189541..9e7df56b4f3 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -341,10 +341,15 @@ public abstract class PipelineBaseE2EIT {
return Collections.emptyList();
}
- protected void assertProxyOrderRecordExist(final Object id, final String tableName) {
+ protected void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
boolean recordExist = false;
+ String sql;
+ if (orderId instanceof String) {
+ sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", tableName, orderId);
+ } else {
+ sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", tableName, orderId);
+ }
for (int i = 0; i < 5; i++) {
- String sql = String.format("select * from %s where order_id = %s", tableName, id);
List<Map<String, Object>> result = queryForListWithLog(sql);
recordExist = !result.isEmpty();
if (recordExist) {
@@ -355,10 +360,15 @@ public abstract class PipelineBaseE2EIT {
assertTrue("The insert record must exist after the stop", recordExist);
}
+ protected int getTargetTableRecordsCount(final String tableName) {
+ List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(1) AS count FROM " + tableName);
+ assertFalse(targetList.isEmpty());
+ return ((Number) targetList.get(0).get("count")).intValue();
+ }
+
protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
- String countSQL = Strings.isNullOrEmpty(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
- Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
- log.info("actual count {}", actual.get("count"));
- assertTrue("actual count " + actual.get("count"), Integer.parseInt(actual.get("count").toString()) > tableInitRows);
+ String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : String.format("%s.t_order", schema);
+ int recordsCount = getTargetTableRecordsCount(tableName);
+ assertTrue("actual count " + recordsCount, recordsCount > tableInitRows);
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d9fd0aa96fe..0cc156aad9f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -118,7 +118,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
startMigrationByJobId(jobId);
// must refresh firstly, otherwise proxy can't get schema and table info
proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
- assertProxyOrderRecordExist(recordId, String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()));
+ assertProxyOrderRecordExist(String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()), recordId);
assertCheckMigrationSuccess(jobId, "DATA_MATCH");
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index eef05692249..a1324bfb6a7 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -37,11 +37,9 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
/**
* E2E IT for different types of indexes, includes:
@@ -52,11 +50,8 @@ import static org.junit.Assert.assertFalse;
@Slf4j
public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
- private final PipelineTestParameter testParam;
-
public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
super(testParam);
- this.testParam = testParam;
}
@Parameters(name = "{0}")
@@ -65,9 +60,11 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
- result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/common/none.xml"));
+ List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ if (versions.isEmpty()) {
+ return result;
}
+ result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
return result;
}
@@ -131,8 +128,8 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
}
private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
- initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
- createSourceOrderTable(sqlPattern);
+ initEnvironment(getDatabaseType(), new MigrationJobType());
+ sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
try (Connection connection = getSourceDataSource().getConnection()) {
KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
@@ -143,22 +140,15 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
createTargetOrderTableRule();
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
String jobId = listJobId().get(0);
+ waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+ assertProxyOrderRecordExist("t_order", "a1");
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
commitMigrationByJobId(jobId);
proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
- assertTargetAndSourceCountAreSame();
+ assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId();
assertThat(lastJobIds.size(), is(0));
}
-
- private void createSourceOrderTable(final String sqlPattern) throws SQLException {
- sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
- }
-
- private void assertTargetAndSourceCountAreSame() {
- List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(*) AS count FROM t_order");
- assertFalse(targetList.isEmpty());
- assertThat(Integer.parseInt(targetList.get(0).get("count").toString()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT));
- }
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
new file mode 100644
index 00000000000..8de9e8d04cf
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@RunWith(Parameterized.class)
+@Slf4j
+public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
+
+ public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
+ super(testParam);
+ }
+
+ @Parameters(name = "{0}")
+ public static Collection<PipelineTestParameter> getTestParameters() {
+ Collection<PipelineTestParameter> result = new LinkedList<>();
+ if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+ return result;
+ }
+ List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ if (versions.isEmpty()) {
+ return result;
+ }
+ // TODO use MariaDBDatabaseType
+ result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
+ return result;
+ }
+
+ @Override
+ protected String getSourceTableOrderName() {
+ return "t_order";
+ }
+
+ @Test
+ public void assertMigrationSuccess() throws SQLException, InterruptedException {
+ initEnvironment(getDatabaseType(), new MigrationJobType());
+ String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+ sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
+ try (Connection connection = getSourceDataSource().getConnection()) {
+ KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
+ PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+ }
+ addMigrationProcessConfig();
+ addMigrationSourceResource();
+ addMigrationTargetResource();
+ createTargetOrderTableRule();
+ startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+ String jobId = listJobId().get(0);
+ waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+ assertProxyOrderRecordExist("t_order", "a1");
+ waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+ assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
+ commitMigrationByJobId(jobId);
+ proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+ assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+ List<String> lastJobIds = listJobId();
+ assertThat(lastJobIds.size(), is(0));
+ }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
similarity index 97%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 8dd45ba470d..977c56ca3db 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.api.impl;
+package org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencycheck.api.impl;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
@@ -63,7 +63,7 @@ public final class ConsistencyCheckJobAPITest {
public void assertCreateJobConfig() {
String migrationJobId = "j0101test";
String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(migrationJobId, null, null));
- ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
+ ConsistencyCheckJobConfiguration jobConfig = checkJobAPI.getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
assertThat(jobConfig.getJobId(), is(expectCheckJobId));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
similarity index 96%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 457f8496d07..fb1994ee3ed 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.it.data.pipeline.core.api.impl;
+package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.impl;
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -39,6 +40,8 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.datasource.creator.PipelineDataSourceCreator;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -79,12 +82,16 @@ public final class MigrationJobAPITest {
private static MigrationJobAPI jobAPI;
+ private static DatabaseType databaseType;
+
@BeforeClass
public static void beforeClass() {
PipelineContextUtil.mockModeConfigAndContextManager();
jobAPI = new MigrationJobAPI();
+ String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
+ databaseType = DatabaseTypeEngine.getDatabaseType(jdbcUrl);
Map<String, Object> props = new HashMap<>();
- props.put("jdbcUrl", "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+ props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
@@ -279,12 +286,12 @@ public final class MigrationJobAPITest {
private void initIntPrimaryEnvironment() throws SQLException {
Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(new MigrationJobType());
DataSourceProperties dataSourceProps = metaDataDataSource.get("ds_0");
- DataSource dataSource = DataSourcePoolCreator.create(dataSourceProps);
try (
+ PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(dataSourceProps), databaseType);
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
- statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id int(10))");
+ statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT)");
}
}