You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2023/02/19 12:45:02 UTC

[shardingsphere] branch master updated: Add faked MariaDBMigrationE2EIT (#24242)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c1ef99c4aba Add faked MariaDBMigrationE2EIT (#24242)
c1ef99c4aba is described below

commit c1ef99c4aba3d7e9d8f40fdeece3cb79eb66e305
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Sun Feb 19 20:44:51 2023 +0800

    Add faked MariaDBMigrationE2EIT (#24242)
---
 .../pipeline/cases/base/PipelineBaseE2EIT.java     | 22 +++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +-
 .../primarykey/IndexesMigrationE2EIT.java          | 30 +++----
 .../primarykey/MariaDBMigrationE2EIT.java          | 98 ++++++++++++++++++++++
 .../api/impl/ConsistencyCheckJobAPITest.java       |  4 +-
 .../migration}/api/impl/MigrationJobAPITest.java   | 15 +++-
 6 files changed, 138 insertions(+), 33 deletions(-)

diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 8cdfb189541..9e7df56b4f3 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -341,10 +341,15 @@ public abstract class PipelineBaseE2EIT {
         return Collections.emptyList();
     }
     
-    protected void assertProxyOrderRecordExist(final Object id, final String tableName) {
+    protected void assertProxyOrderRecordExist(final String tableName, final Object orderId) {
         boolean recordExist = false;
+        String sql;
+        if (orderId instanceof String) {
+            sql = String.format("SELECT 1 FROM %s WHERE order_id = '%s'", tableName, orderId);
+        } else {
+            sql = String.format("SELECT 1 FROM %s WHERE order_id = %s", tableName, orderId);
+        }
         for (int i = 0; i < 5; i++) {
-            String sql = String.format("select * from %s where order_id = %s", tableName, id);
             List<Map<String, Object>> result = queryForListWithLog(sql);
             recordExist = !result.isEmpty();
             if (recordExist) {
@@ -355,10 +360,15 @@ public abstract class PipelineBaseE2EIT {
         assertTrue("The insert record must exist after the stop", recordExist);
     }
     
+    protected int getTargetTableRecordsCount(final String tableName) {
+        List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(1) AS count FROM " + tableName);
+        assertFalse(targetList.isEmpty());
+        return ((Number) targetList.get(0).get("count")).intValue();
+    }
+    
     protected void assertGreaterThanOrderTableInitRows(final int tableInitRows, final String schema) {
-        String countSQL = Strings.isNullOrEmpty(schema) ? "SELECT COUNT(*) as count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", schema);
-        Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
-        log.info("actual count {}", actual.get("count"));
-        assertTrue("actual count " + actual.get("count"), Integer.parseInt(actual.get("count").toString()) > tableInitRows);
+        String tableName = Strings.isNullOrEmpty(schema) ? "t_order" : String.format("%s.t_order", schema);
+        int recordsCount = getTargetTableRecordsCount(tableName);
+        assertTrue("actual count " + recordsCount, recordsCount > tableInitRows);
     }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d9fd0aa96fe..0cc156aad9f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -118,7 +118,7 @@ public final class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EI
         startMigrationByJobId(jobId);
         // must refresh firstly, otherwise proxy can't get schema and table info
         proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
-        assertProxyOrderRecordExist(recordId, String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()));
+        assertProxyOrderRecordExist(String.join(".", PipelineBaseE2EIT.SCHEMA_NAME, getTargetTableOrderName()), recordId);
         assertCheckMigrationSuccess(jobId, "DATA_MATCH");
     }
     
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index eef05692249..a1324bfb6a7 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -37,11 +37,9 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertFalse;
 
 /**
  * E2E IT for different types of indexes, includes:
@@ -52,11 +50,8 @@ import static org.junit.Assert.assertFalse;
 @Slf4j
 public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     
-    private final PipelineTestParameter testParam;
-    
     public IndexesMigrationE2EIT(final PipelineTestParameter testParam) {
         super(testParam);
-        this.testParam = testParam;
     }
     
     @Parameters(name = "{0}")
@@ -65,9 +60,11 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        for (String version : PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType())) {
-            result.add(new PipelineTestParameter(new MySQLDatabaseType(), version, "env/common/none.xml"));
+        List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        if (versions.isEmpty()) {
+            return result;
         }
+        result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
         return result;
     }
     
@@ -131,8 +128,8 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     }
     
     private void assertMigrationSuccess(final String sqlPattern, final String consistencyCheckAlgorithmType) throws SQLException, InterruptedException {
-        initEnvironment(testParam.getDatabaseType(), new MigrationJobType());
-        createSourceOrderTable(sqlPattern);
+        initEnvironment(getDatabaseType(), new MigrationJobType());
+        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
         try (Connection connection = getSourceDataSource().getConnection()) {
             KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
             PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
@@ -143,22 +140,15 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         createTargetOrderTableRule();
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         String jobId = listJobId().get(0);
+        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+        assertProxyOrderRecordExist("t_order", "a1");
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
         assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
         commitMigrationByJobId(jobId);
         proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
-        assertTargetAndSourceCountAreSame();
+        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId();
         assertThat(lastJobIds.size(), is(0));
     }
-    
-    private void createSourceOrderTable(final String sqlPattern) throws SQLException {
-        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
-    }
-    
-    private void assertTargetAndSourceCountAreSame() {
-        List<Map<String, Object>> targetList = queryForListWithLog("SELECT COUNT(*) AS count FROM t_order");
-        assertFalse(targetList.isEmpty());
-        assertThat(Integer.parseInt(targetList.get(0).get("count").toString()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT));
-    }
 }
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
new file mode 100644
index 00000000000..8de9e8d04cf
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.PipelineBaseE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.AbstractMigrationE2EIT;
+import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@RunWith(Parameterized.class)
+@Slf4j
+public final class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
+    
+    public MariaDBMigrationE2EIT(final PipelineTestParameter testParam) {
+        super(testParam);
+    }
+    
+    @Parameters(name = "{0}")
+    public static Collection<PipelineTestParameter> getTestParameters() {
+        Collection<PipelineTestParameter> result = new LinkedList<>();
+        if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
+            return result;
+        }
+        List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        if (versions.isEmpty()) {
+            return result;
+        }
+        // TODO use MariaDBDatabaseType
+        result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
+        return result;
+    }
+    
+    @Override
+    protected String getSourceTableOrderName() {
+        return "t_order";
+    }
+    
+    @Test
+    public void assertMigrationSuccess() throws SQLException, InterruptedException {
+        initEnvironment(getDatabaseType(), new MigrationJobType());
+        String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
+        sourceExecuteWithLog(String.format(sqlPattern, getSourceTableOrderName()));
+        try (Connection connection = getSourceDataSource().getConnection()) {
+            KeyGenerateAlgorithm generateAlgorithm = new UUIDKeyGenerateAlgorithm();
+            PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, generateAlgorithm, getSourceTableOrderName(), PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT);
+        }
+        addMigrationProcessConfig();
+        addMigrationSourceResource();
+        addMigrationTargetResource();
+        createTargetOrderTableRule();
+        startMigration(getSourceTableOrderName(), getTargetTableOrderName());
+        String jobId = listJobId().get(0);
+        waitJobPrepareSuccess(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        sourceExecuteWithLog("INSERT INTO t_order (order_id, user_id, status) VALUES ('a1', 1, 'OK')");
+        assertProxyOrderRecordExist("t_order", "a1");
+        waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
+        assertCheckMigrationSuccess(jobId, "CRC32_MATCH");
+        commitMigrationByJobId(jobId);
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
+        assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));
+        List<String> lastJobIds = listJobId();
+        assertThat(lastJobIds.size(), is(0));
+    }
+}
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
similarity index 97%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 8dd45ba470d..977c56ca3db 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/ConsistencyCheckJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.it.data.pipeline.core.api.impl;
+package org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencycheck.api.impl;
 
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
@@ -63,7 +63,7 @@ public final class ConsistencyCheckJobAPITest {
     public void assertCreateJobConfig() {
         String migrationJobId = "j0101test";
         String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(migrationJobId, null, null));
-        ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) checkJobAPI.getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration jobConfig = checkJobAPI.getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = "j0201" + migrationJobId + expectedSequence;
         assertThat(jobConfig.getJobId(), is(expectCheckJobId));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
similarity index 96%
rename from test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
rename to test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 457f8496d07..fb1994ee3ed 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.test.it.data.pipeline.core.api.impl;
+package org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.impl;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
@@ -39,6 +40,8 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.context.Migrat
 import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.datasource.creator.PipelineDataSourceCreator;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -79,12 +82,16 @@ public final class MigrationJobAPITest {
     
     private static MigrationJobAPI jobAPI;
     
+    private static DatabaseType databaseType;
+    
     @BeforeClass
     public static void beforeClass() {
         PipelineContextUtil.mockModeConfigAndContextManager();
         jobAPI = new MigrationJobAPI();
+        String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
+        databaseType = DatabaseTypeEngine.getDatabaseType(jdbcUrl);
         Map<String, Object> props = new HashMap<>();
-        props.put("jdbcUrl", "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL");
+        props.put("jdbcUrl", jdbcUrl);
         props.put("username", "root");
         props.put("password", "root");
         Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
@@ -279,12 +286,12 @@ public final class MigrationJobAPITest {
     private void initIntPrimaryEnvironment() throws SQLException {
         Map<String, DataSourceProperties> metaDataDataSource = new PipelineDataSourcePersistService().load(new MigrationJobType());
         DataSourceProperties dataSourceProps = metaDataDataSource.get("ds_0");
-        DataSource dataSource = DataSourcePoolCreator.create(dataSourceProps);
         try (
+                PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(dataSourceProps), databaseType);
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
-            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id int(10))");
+            statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, user_id INT)");
         }
     }