You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/15 04:12:43 UTC

[shardingsphere] branch master updated: Refactor Scaling IT code, prepare support multiple scenarios (#17637)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3235922e9a6 Refactor Scaling IT code, prepare support multiple scenarios (#17637)
3235922e9a6 is described below

commit 3235922e9a689b050635fd9070cb3b32e34bdec1
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Sun May 15 12:12:35 2022 +0800

    Refactor Scaling IT code, prepare support multiple scenarios (#17637)
    
    * Scaling IT code refactoring, for support multiple scenarios
    
    * Add part of the table scaling IT case
    
    * Fix codestyle
    
    * Remove useless code
    
    * Fix codestyles
---
 .../data/pipeline/cases/{ => base}/BaseITCase.java |  38 ++++++--
 .../cases/{mysql => base}/BaseMySQLITCase.java     |  46 ++++++----
 .../{openguass => base}/BaseOpenGaussITCase.java   |  38 ++++----
 .../{postgresql => base}/BasePostgreSQLITCase.java |  36 ++++----
 .../data/pipeline/cases/base/BaseTaskRunnable.java | 101 +++++++++++++++++++++
 .../pipeline/cases/command/CommonSQLCommand.java   |  18 +++-
 .../cases/common/SimpleIncrementTaskRunnable.java  |  62 +++++++++++++
 .../cases/mysql/MySQLIncrementTaskRunnable.java    |  79 ----------------
 .../pipeline/cases/mysql/MySQLManualScalingIT.java |  45 +++++----
 .../cases/openguass/OpenGaussManualScalingIT.java  |  51 +++++++----
 .../cases/postgresql/PostgreSQLDDLGeneratorIT.java |   7 +-
 .../PostgreSQLIncrementTaskRunnable.java           |  78 ----------------
 .../postgresql/PostgreSQLManualScalingIT.java      |  43 ++++++---
 .../scenario/ScalingScenario.java}                 |  30 +++---
 .../proxy/ShardingSphereProxyDockerContainer.java  |   2 -
 .../framework/helper/ScalingTableSQLHelper.java    |  75 +++++++++++++++
 .../framework/param/ScalingParameterized.java      |   2 +-
 .../data/pipeline/util/TableCrudUtil.java          |  12 ---
 .../src/test/resources/env/common/command.xml      |  28 ++++--
 .../manual/mysql/{ => integer_primary_key}/sql.xml |   0
 .../manual/mysql/{ => text_primary_key}/sql.xml    |  42 +--------
 .../postgresql/{ => integer_primary_key}/sql.xml   |   0
 .../postgresql/{ => text_primary_key}/sql.xml      |  37 +-------
 23 files changed, 479 insertions(+), 391 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
similarity index 88%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index a19c1eba1f4..2a12c7e1da3 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.cases;
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
 
 import com.google.common.collect.Sets;
 import com.zaxxer.hikari.HikariDataSource;
@@ -143,14 +143,25 @@ public abstract class BaseITCase {
         }
     }
     
-    protected void initShardingRule() throws InterruptedException {
-        for (String sql : getCommonSQLCommand().getCreateShardingAlgorithm()) {
-            getJdbcTemplate().execute(sql);
-            // TODO sleep to wait for sharding algorithm table created,otherwise the next sql will fail.
-            TimeUnit.SECONDS.sleep(1);
-        }
-        jdbcTemplate.execute(getCommonSQLCommand().getCreateShardingTable());
+    protected void initShardingAlgorithm() {
+        jdbcTemplate.execute(getCommonSQLCommand().getCreateDatabaseShardingAlgorithm());
+        jdbcTemplate.execute(getCommonSQLCommand().getCreateOrderShardingAlgorithm());
+        jdbcTemplate.execute(getCommonSQLCommand().getCreateOrderItemShardingAlgorithm());
+    }
+    
+    protected void createAllSharingTableRule() {
+        jdbcTemplate.execute(commonSQLCommand.getCreateAllSharingTableRule());
+    }
+    
+    protected void createOrderSharingTableRule() {
+        jdbcTemplate.execute(commonSQLCommand.getCreateOrderShardingTableRule());
+    }
+    
+    protected void bindingShardingRule() {
         jdbcTemplate.execute("CREATE SHARDING BINDING TABLE RULES (t_order,t_order_item)");
+    }
+    
+    protected void createScalingRule() {
         jdbcTemplate.execute("CREATE SHARDING SCALING RULE scaling_manual (DATA_CONSISTENCY_CHECKER(TYPE(NAME=DATA_MATCH)))");
     }
     
@@ -158,6 +169,12 @@ public abstract class BaseITCase {
         jdbcTemplate.execute(String.format("CREATE SCHEMA %s", schemaName));
     }
     
+    protected void assertOriginalSourceSuccess() {
+        List<Map<String, Object>> previewResults = getJdbcTemplate().queryForList("PREVIEW SELECT COUNT(1) FROM t_order");
+        Set<Object> originalSources = previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet());
+        assertThat(originalSources, is(Sets.newHashSet("ds_0", "ds_1")));
+    }
+    
     /**
      * Check data match consistency.
      *
@@ -165,7 +182,7 @@ public abstract class BaseITCase {
      * @param jobId job id
      * @throws InterruptedException interrupted exception
      */
-    protected void checkMatchConsistency(final JdbcTemplate jdbcTemplate, final String jobId) throws InterruptedException {
+    protected void assertCheckMatchConsistencySuccess(final JdbcTemplate jdbcTemplate, final String jobId) throws InterruptedException {
         Map<String, String> actualStatusMap = new HashMap<>(2, 1);
         for (int i = 0; i < 100; i++) {
             List<Map<String, Object>> showScalingStatusResMap = jdbcTemplate.queryForList(String.format("SHOW SCALING STATUS %s", jobId));
@@ -204,6 +221,9 @@ public abstract class BaseITCase {
     
     @After
     public void stopContainer() {
+        if (composedContainer instanceof DockerComposedContainer) {
+            log.info(((DockerComposedContainer) composedContainer).getProxyContainer().getLogs());
+        }
         composedContainer.stop();
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/BaseMySQLITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseMySQLITCase.java
similarity index 62%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/BaseMySQLITCase.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseMySQLITCase.java
index 6ea943a2518..f4fbac260f0 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/BaseMySQLITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseMySQLITCase.java
@@ -15,54 +15,62 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.cases.mysql;
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
 
+import lombok.Getter;
 import lombok.SneakyThrows;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import org.apache.shardingsphere.integration.data.pipeline.cases.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import org.apache.shardingsphere.integration.data.pipeline.cases.postgresql.BasePostgreSQLITCase;
+import org.apache.shardingsphere.integration.data.pipeline.cases.common.SimpleIncrementTaskRunnable;
+import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingTableSQLHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import javax.xml.bind.JAXB;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 
 public abstract class BaseMySQLITCase extends BaseITCase {
     
-    protected static final DatabaseType DATABASE = new MySQLDatabaseType();
+    protected static final DatabaseType DATABASE_TYPE = new MySQLDatabaseType();
     
     private final ExtraSQLCommand extraSQLCommand;
     
+    @Getter
+    private final ScalingTableSQLHelper sqlHelper;
+    
     public BaseMySQLITCase(final ScalingParameterized parameterized) {
         super(parameterized);
-        extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BasePostgreSQLITCase.class.getClassLoader().getResource(parameterized.getParentPath() + "/sql.xml")), ExtraSQLCommand.class);
-        initTableAndData();
+        extraSQLCommand = JAXB.unmarshal(BaseMySQLITCase.class.getClassLoader().getResource(parameterized.getScenario()), ExtraSQLCommand.class);
+        sqlHelper = new ScalingTableSQLHelper(DATABASE_TYPE, extraSQLCommand, getJdbcTemplate());
     }
     
-    @SneakyThrows({SQLException.class, InterruptedException.class})
-    protected void initTableAndData() {
+    @SneakyThrows(SQLException.class)
+    protected void addResource() {
         Properties queryProps = createQueryProperties();
         // TODO if use jdbcurl like "jdbc:mysql:localhost:3307/sharding_db", will throw exception show "Datasource or ShardingSphere rule does not exist"
         try (Connection connection = DriverManager.getConnection(JDBC_URL_APPENDER.appendQueryProperties(getComposedContainer().getProxyJdbcUrl(""), queryProps), "root", "root")) {
             connection.createStatement().execute("USE sharding_db");
             addResource(connection);
         }
-        initShardingRule();
-        setIncreaseTaskThread(new Thread(new MySQLIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand)));
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrder());
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrderItem());
+    }
+    
+    protected void startIncrementTask(final KeyGenerateAlgorithm keyGenerateAlgorithm) {
+        setIncreaseTaskThread(new Thread(new SimpleIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand, keyGenerateAlgorithm)));
         getIncreaseTaskThread().start();
-        Pair<List<Object[]>, List<Object[]>> dataPair = TableCrudUtil.generateMySQLInsertDataList(3000);
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getFullInsertOrder(), dataPair.getLeft());
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
+    }
+    
+    /**
+     * Add no use table, to test part of the table.
+     */
+    protected void createNoUseTable() {
+        getJdbcTemplate().execute("CREATE SHARDING TABLE RULE no_use (RESOURCES(ds_0, ds_1), SHARDING_COLUMN=sharding_id, TYPE(NAME=MOD,PROPERTIES('sharding-count'=4)))");
+        getJdbcTemplate().execute("CREATE TABLE no_use(id int(11) NOT NULL,sharding_id int(11) NOT NULL, PRIMARY KEY (id))");
+        getJdbcTemplate().execute("INSERT INTO no_use(id,sharding_id) values (1,1)");
+        getJdbcTemplate().execute("INSERT INTO no_use(id,sharding_id) values (2,2)");
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/BaseOpenGaussITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseOpenGaussITCase.java
similarity index 63%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/BaseOpenGaussITCase.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseOpenGaussITCase.java
index 999ac73347c..d6d7e0508f2 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/BaseOpenGaussITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseOpenGaussITCase.java
@@ -15,53 +15,51 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.cases.openguass;
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
 
+import lombok.Getter;
 import lombok.SneakyThrows;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.apache.shardingsphere.integration.data.pipeline.cases.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import org.apache.shardingsphere.integration.data.pipeline.cases.postgresql.PostgreSQLIncrementTaskRunnable;
+import org.apache.shardingsphere.integration.data.pipeline.cases.common.SimpleIncrementTaskRunnable;
+import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingTableSQLHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import javax.xml.bind.JAXB;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 
 public abstract class BaseOpenGaussITCase extends BaseITCase {
     
-    protected static final DatabaseType DATABASE = new OpenGaussDatabaseType();
+    protected static final DatabaseType DATABASE_TYPE = new OpenGaussDatabaseType();
     
+    @Getter
     private final ExtraSQLCommand extraSQLCommand;
     
+    @Getter
+    private final ScalingTableSQLHelper sqlHelper;
+    
     public BaseOpenGaussITCase(final ScalingParameterized parameterized) {
         super(parameterized);
-        extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BaseOpenGaussITCase.class.getClassLoader().getResource(parameterized.getParentPath() + "/sql.xml")), ExtraSQLCommand.class);
-        initTableAndData();
+        extraSQLCommand = JAXB.unmarshal(BaseOpenGaussITCase.class.getClassLoader().getResource(parameterized.getScenario()), ExtraSQLCommand.class);
+        sqlHelper = new ScalingTableSQLHelper(DATABASE_TYPE, extraSQLCommand, getJdbcTemplate());
     }
     
-    @SneakyThrows({SQLException.class, InterruptedException.class})
-    protected void initTableAndData() {
+    @SneakyThrows(SQLException.class)
+    protected void addResource() {
         Properties queryProps = createQueryProperties();
         try (Connection connection = DriverManager.getConnection(JDBC_URL_APPENDER.appendQueryProperties(getComposedContainer().getProxyJdbcUrl("sharding_db"), queryProps), "root", "root")) {
             addResource(connection, "gaussdb", "Root@123");
         }
-        initShardingRule();
-        createSchema("test");
-        setIncreaseTaskThread(new Thread(new PostgreSQLIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand)));
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrder());
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrderItem());
+    }
+    
+    protected void startIncrementTask(final KeyGenerateAlgorithm keyGenerateAlgorithm) {
+        setIncreaseTaskThread(new Thread(new SimpleIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand, keyGenerateAlgorithm)));
         getIncreaseTaskThread().start();
-        Pair<List<Object[]>, List<Object[]>> dataPair = TableCrudUtil.generatePostgresSQLInsertDataList(3000);
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getFullInsertOrder(), dataPair.getLeft());
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/BasePostgreSQLITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BasePostgreSQLITCase.java
similarity index 65%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/BasePostgreSQLITCase.java
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BasePostgreSQLITCase.java
index 1142758cd57..a439ac8ca0b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/BasePostgreSQLITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BasePostgreSQLITCase.java
@@ -15,52 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.cases.postgresql;
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
 
+import lombok.Getter;
 import lombok.SneakyThrows;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
-import org.apache.shardingsphere.integration.data.pipeline.cases.BaseITCase;
 import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
+import org.apache.shardingsphere.integration.data.pipeline.cases.common.SimpleIncrementTaskRunnable;
+import org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingTableSQLHelper;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
-import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import javax.xml.bind.JAXB;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 
 public abstract class BasePostgreSQLITCase extends BaseITCase {
     
-    protected static final DatabaseType DATABASE = new PostgreSQLDatabaseType();
+    protected static final DatabaseType DATABASE_TYPE = new PostgreSQLDatabaseType();
     
     private final ExtraSQLCommand extraSQLCommand;
     
+    @Getter
+    private final ScalingTableSQLHelper sqlHelper;
+    
     public BasePostgreSQLITCase(final ScalingParameterized parameterized) {
         super(parameterized);
-        extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(BasePostgreSQLITCase.class.getClassLoader().getResource(parameterized.getParentPath() + "/sql.xml")), ExtraSQLCommand.class);
-        initTableAndData();
+        extraSQLCommand = JAXB.unmarshal(BasePostgreSQLITCase.class.getClassLoader().getResource(parameterized.getScenario()), ExtraSQLCommand.class);
+        sqlHelper = new ScalingTableSQLHelper(DATABASE_TYPE, extraSQLCommand, getJdbcTemplate());
     }
     
-    @SneakyThrows({SQLException.class, InterruptedException.class})
-    protected void initTableAndData() {
+    @SneakyThrows(SQLException.class)
+    protected void addResource() {
         Properties queryProps = createQueryProperties();
         try (Connection connection = DriverManager.getConnection(JDBC_URL_APPENDER.appendQueryProperties(getComposedContainer().getProxyJdbcUrl("sharding_db"), queryProps), "root", "root")) {
             addResource(connection);
         }
-        initShardingRule();
-        createSchema("test");
-        setIncreaseTaskThread(new Thread(new PostgreSQLIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand)));
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrder());
-        getJdbcTemplate().execute(extraSQLCommand.getCreateTableOrderItem());
+    }
+    
+    protected void startIncrementTask(final KeyGenerateAlgorithm keyGenerateAlgorithm) {
+        setIncreaseTaskThread(new Thread(new SimpleIncrementTaskRunnable(getJdbcTemplate(), extraSQLCommand, keyGenerateAlgorithm)));
         getIncreaseTaskThread().start();
-        Pair<List<Object[]>, List<Object[]>> dataPair = TableCrudUtil.generatePostgresSQLInsertDataList(3000);
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getFullInsertOrder(), dataPair.getLeft());
-        getJdbcTemplate().batchUpdate(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
     }
     
     @Override
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseTaskRunnable.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseTaskRunnable.java
new file mode 100644
index 00000000000..cfb166a770f
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseTaskRunnable.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.data.pipeline.cases.base;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.time.Instant;
+import java.util.concurrent.ThreadLocalRandom;
+
+@Getter
+@AllArgsConstructor
+public abstract class BaseTaskRunnable implements Runnable {
+    
+    private final JdbcTemplate jdbcTemplate;
+    
+    private final ExtraSQLCommand extraSQLCommand;
+    
+    private final KeyGenerateAlgorithm keyGenerateAlgorithm;
+    
+    protected abstract Object[] getOrderInsertDate();
+    
+    protected abstract Object[] getOrderInsertItemDate();
+    
+    /**
+     * Insert order.
+     *
+     * @return primary key of insert data
+     */
+    public Object insertOrder() {
+        Object[] orderInsertDate = getOrderInsertDate();
+        jdbcTemplate.update(extraSQLCommand.getInsertOrder(), orderInsertDate);
+        return orderInsertDate[0];
+    }
+    
+    /**
+     * Insert order item.
+     *
+     * @return primary key of insert data
+     */
+    public Object insertOrderItem() {
+        Object[] orderInsertItemDate = getOrderInsertItemDate();
+        jdbcTemplate.update(extraSQLCommand.getInsertOrderItem(), orderInsertItemDate);
+        return orderInsertItemDate[0];
+    }
+    
+    /**
+     * Update order by primary key.
+     *
+     * @param primaryKey primary key
+     */
+    public void updateOrderByPrimaryKey(final Object primaryKey) {
+        jdbcTemplate.update(extraSQLCommand.getUpdateOrderById(), "updated" + Instant.now().getEpochSecond(), null, primaryKey);
+        jdbcTemplate.update(extraSQLCommand.getUpdateOrderById(), "updated" + Instant.now().getEpochSecond(), ThreadLocalRandom.current().nextInt(0, 100), primaryKey);
+    }
+    
+    /**
+     * Update order item by primary key.
+     *
+     * @param primaryKey primary key
+     */
+    public void updateOrderItemByPrimaryKey(final Object primaryKey) {
+        jdbcTemplate.update(extraSQLCommand.getUpdateOrderItemById(), "updated" + Instant.now().getEpochSecond(), primaryKey);
+    }
+    
+    /**
+     * Delete order by primary key.
+     *
+     * @param primaryKey primary key
+     */
+    public void deleteOrderByPrimaryKey(final Object primaryKey) {
+        jdbcTemplate.update(extraSQLCommand.getDeleteOrderById(), primaryKey);
+    }
+    
+    /**
+     * Delete order by primary key.
+     *
+     * @param primaryKey primary key
+     */
+    public void deleteOrderItemByPrimaryKey(final Object primaryKey) {
+        jdbcTemplate.update(extraSQLCommand.getDeleteOrderItemById(), primaryKey);
+    }
+}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
index 17819cf2463..3b50e42a6ec 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/CommonSQLCommand.java
@@ -23,18 +23,26 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.List;
 
 @Data
 @XmlRootElement(name = "command")
 @XmlAccessorType(XmlAccessType.FIELD)
 public final class CommonSQLCommand {
     
-    @XmlElement(name = "create-sharding-algorithm")
-    private List<String> createShardingAlgorithm;
+    @XmlElement(name = "create-database-sharding-algorithm")
+    private String createDatabaseShardingAlgorithm;
     
-    @XmlElement(name = "create-sharding-table")
-    private String createShardingTable;
+    @XmlElement(name = "create-order-sharding-algorithm")
+    private String createOrderShardingAlgorithm;
+    
+    @XmlElement(name = "create-order-item-sharding-algorithm")
+    private String createOrderItemShardingAlgorithm;
+    
+    @XmlElement(name = "create-all-sharding-table-rule")
+    private String createAllSharingTableRule;
+    
+    @XmlElement(name = "create-order-sharding-table-rule")
+    private String createOrderShardingTableRule;
     
     @XmlElement(name = "alter-sharding-algorithm")
     private String alterShardingAlgorithm;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/common/SimpleIncrementTaskRunnable.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/common/SimpleIncrementTaskRunnable.java
new file mode 100644
index 00000000000..61a5d32ed74
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/common/SimpleIncrementTaskRunnable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.data.pipeline.cases.common;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseTaskRunnable;
+import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
+import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+@Slf4j
+public final class SimpleIncrementTaskRunnable extends BaseTaskRunnable {
+    
+    public SimpleIncrementTaskRunnable(final JdbcTemplate jdbcTemplate, final ExtraSQLCommand extraSQLCommand, final KeyGenerateAlgorithm keyGenerateAlgorithm) {
+        super(jdbcTemplate, extraSQLCommand, keyGenerateAlgorithm);
+    }
+    
+    @Override
+    public void run() {
+        int executeCount = 0;
+        while (executeCount < 20 && !Thread.currentThread().isInterrupted()) {
+            Object orderPrimaryKey = insertOrder();
+            Object orderItemPrimaryKey = insertOrderItem();
+            if (executeCount % 2 == 0) {
+                deleteOrderByPrimaryKey(orderPrimaryKey);
+                deleteOrderItemByPrimaryKey(orderItemPrimaryKey);
+            } else {
+                updateOrderByPrimaryKey(orderPrimaryKey);
+                updateOrderItemByPrimaryKey(orderItemPrimaryKey);
+            }
+            executeCount++;
+            log.info("Simple increment task runnable execute successfully.");
+        }
+    }
+    
+    @Override
+    protected Object[] getOrderInsertDate() {
+        return new Object[]{getKeyGenerateAlgorithm().generateKey(), ThreadLocalRandom.current().nextInt(0, 6), ThreadLocalRandom.current().nextInt(0, 6)};
+    }
+    
+    @Override
+    protected Object[] getOrderInsertItemDate() {
+        return new Object[]{getKeyGenerateAlgorithm().generateKey(), ThreadLocalRandom.current().nextInt(0, 6), ThreadLocalRandom.current().nextInt(0, 6), "OK"};
+    }
+}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLIncrementTaskRunnable.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLIncrementTaskRunnable.java
deleted file mode 100644
index fd60e4fdaed..00000000000
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLIncrementTaskRunnable.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.integration.data.pipeline.cases.mysql;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.LinkedList;
-import java.util.List;
-
-@Slf4j
-@AllArgsConstructor
-public final class MySQLIncrementTaskRunnable implements Runnable {
-    
-    private final JdbcTemplate jdbcTemplate;
-    
-    private final ExtraSQLCommand extraSQLCommand;
-    
-    @Override
-    public void run() {
-        int executeCount = 0;
-        List<Long> newPrimaryKeys = new LinkedList<>();
-        try {
-            while (executeCount < 20 && !Thread.currentThread().isInterrupted()) {
-                newPrimaryKeys.add(insertOrderAndOrderItem());
-                if (newPrimaryKeys.size() % 2 == 0) {
-                    deleteOrderAndOrderItem(newPrimaryKeys.get(newPrimaryKeys.size() - 1));
-                } else {
-                    updateOrderAndOrderItem(newPrimaryKeys.get(newPrimaryKeys.size() - 1));
-                }
-                executeCount++;
-                log.info("Increment task runnable execute successfully.");
-            }
-        } catch (final SQLException ex) {
-            log.error("IncrementTaskThread error", ex);
-            throw new RuntimeException(ex);
-        }
-    }
-    
-    private long insertOrderAndOrderItem() throws SQLException {
-        Pair<Object[], Object[]> dataPair = TableCrudUtil.generateSimpleInsertData();
-        jdbcTemplate.update(extraSQLCommand.getInsertOrder(), dataPair.getLeft());
-        jdbcTemplate.update(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
-        return Long.parseLong(dataPair.getLeft()[0].toString());
-    }
-    
-    private void updateOrderAndOrderItem(final long primaryKey) throws SQLException {
-        jdbcTemplate.update(extraSQLCommand.getUpdateOrderById(), null, null, primaryKey);
-        long epochSecond = Instant.now().getEpochSecond();
-        jdbcTemplate.update(extraSQLCommand.getUpdateOrderById(), "update" + epochSecond, epochSecond, primaryKey);
-        jdbcTemplate.update(extraSQLCommand.getUpdateOrderItemById(), "changed" + epochSecond, primaryKey);
-    }
-    
-    private void deleteOrderAndOrderItem(final long primaryKey) throws SQLException {
-        jdbcTemplate.update(extraSQLCommand.getDeleteOrderById(), primaryKey);
-        jdbcTemplate.update(extraSQLCommand.getDeleteOrderItemById(), primaryKey);
-    }
-}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLManualScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLManualScalingIT.java
index 21941aa1386..eaa1f0aa360 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLManualScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/mysql/MySQLManualScalingIT.java
@@ -17,11 +17,15 @@
 
 package org.apache.shardingsphere.integration.data.pipeline.cases.mysql;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseMySQLITCase;
+import org.apache.shardingsphere.integration.data.pipeline.cases.scenario.ScalingScenario;
 import org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -29,13 +33,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import java.util.concurrent.TimeUnit;
 
 /**
  * MySQL manual scaling test case.
@@ -57,20 +55,35 @@ public final class MySQLManualScalingIT extends BaseMySQLITCase {
             if (Strings.isNullOrEmpty(version)) {
                 continue;
             }
-            result.add(new ScalingParameterized(DATABASE, version, "env/scenario/manual/mysql"));
+            for (String scenario : ScalingScenario.listScenario()) {
+                result.add(new ScalingParameterized(DATABASE_TYPE, version, Joiner.on("/").join("env/scenario/manual/mysql", scenario, ScalingScenario.SCENARIO_SUFFIX)));
+            }
         }
         return result;
     }
     
+    @Before
+    public void setUp() throws InterruptedException {
+        addResource();
+        initShardingAlgorithm();
+        // TODO wait for algorithm init
+        TimeUnit.SECONDS.sleep(2);
+        createScalingRule();
+    }
+    
     @Test
     public void assertManualScalingSuccess() throws InterruptedException {
-        List<Map<String, Object>> previewResults = getJdbcTemplate().queryForList("PREVIEW SELECT COUNT(1) FROM t_order");
-        Set<Object> originalSources = previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet());
-        assertThat(originalSources, is(Sets.newHashSet("ds_0", "ds_1")));
+        createAllSharingTableRule();
+        bindingShardingRule();
+        createNoUseTable();
+        getSqlHelper().createOrderTable();
+        getSqlHelper().createOrderItemTable();
+        getSqlHelper().initTableData(true);
+        startIncrementTask(new SnowflakeKeyGenerateAlgorithm());
+        assertOriginalSourceSuccess();
         getJdbcTemplate().execute(getCommonSQLCommand().getAutoAlterTableRule());
-        Map<String, Object> showScalingResMap = getJdbcTemplate().queryForMap("SHOW SCALING LIST");
-        String jobId = showScalingResMap.get("id").toString();
-        getIncreaseTaskThread().join(60 * 1000);
-        checkMatchConsistency(getJdbcTemplate(), jobId);
+        String jobId = String.valueOf(getJdbcTemplate().queryForMap("SHOW SCALING LIST").get("id"));
+        getIncreaseTaskThread().join(60 * 1000L);
+        assertCheckMatchConsistencySuccess(getJdbcTemplate(), jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/OpenGaussManualScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/OpenGaussManualScalingIT.java
index ade2634750e..4bfa7c36490 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/OpenGaussManualScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/openguass/OpenGaussManualScalingIT.java
@@ -17,10 +17,16 @@
 
 package org.apache.shardingsphere.integration.data.pipeline.cases.openguass;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.integration.data.pipeline.cases.base.BaseOpenGaussITCase;
+import org.apache.shardingsphere.integration.data.pipeline.cases.scenario.ScalingScenario;
 import org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
+import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -29,12 +35,7 @@ import org.junit.runners.Parameterized.Parameters;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import java.util.concurrent.TimeUnit;
 
 @RunWith(Parameterized.class)
 public final class OpenGaussManualScalingIT extends BaseOpenGaussITCase {
@@ -52,22 +53,38 @@ public final class OpenGaussManualScalingIT extends BaseOpenGaussITCase {
             if (Strings.isNullOrEmpty(dockerImageName)) {
                 continue;
             }
-            result.add(new ScalingParameterized(DATABASE, dockerImageName, "env/scenario/manual/postgresql"));
+            for (String scenario : ScalingScenario.listScenario()) {
+                result.add(new ScalingParameterized(DATABASE_TYPE, dockerImageName, Joiner.on("/").join("env/scenario/manual/postgresql", scenario, ScalingScenario.SCENARIO_SUFFIX)));
+            }
         }
         return result;
     }
     
+    @Before
+    public void setUp() throws InterruptedException {
+        addResource();
+        initShardingAlgorithm();
+        // TODO wait for algorithm init
+        TimeUnit.SECONDS.sleep(2);
+        createScalingRule();
+        createSchema("test");
+    }
+    
     @Test
     public void assertManualScalingSuccess() throws InterruptedException {
-        List<Map<String, Object>> previewResults = getJdbcTemplate().queryForList("PREVIEW SELECT COUNT(1) FROM t_order");
-        Set<Object> originalSources = previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet());
-        assertThat(originalSources, is(Sets.newHashSet("ds_0", "ds_1")));
+        createAllSharingTableRule();
+        bindingShardingRule();
+        getSqlHelper().createOrderTable();
+        getSqlHelper().createOrderItemTable();
+        getSqlHelper().initTableData(true);
+        Pair<List<Object[]>, List<Object[]>> dataPair = TableCrudUtil.generatePostgresSQLInsertDataList(3000);
+        getJdbcTemplate().batchUpdate(getExtraSQLCommand().getFullInsertOrder(), dataPair.getLeft());
+        getJdbcTemplate().batchUpdate(getExtraSQLCommand().getInsertOrderItem(), dataPair.getRight());
+        startIncrementTask(new SnowflakeKeyGenerateAlgorithm());
+        assertOriginalSourceSuccess();
         getJdbcTemplate().execute(getCommonSQLCommand().getAutoAlterTableRule());
-        Map<String, Object> showScalingResMap = getJdbcTemplate().queryForMap("SHOW SCALING LIST");
-        String jobId = String.valueOf(showScalingResMap.get("id"));
-        if (null == getIncreaseTaskThread()) {
-            getIncreaseTaskThread().join(60 * 1000L);
-        }
-        checkMatchConsistency(getJdbcTemplate(), jobId);
+        String jobId = String.valueOf(getJdbcTemplate().queryForMap("SHOW SCALING LIST").get("id"));
+        getIncreaseTaskThread().join(60 * 1000L);
+        assertCheckMatchConsistencySuccess(getJdbcTemplate(), jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLDDLGeneratorIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLDDLGeneratorIT.java
index 25bf61e5c78..4103338af58 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLDDLGeneratorIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLDDLGeneratorIT.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.integration.data.pipeline.cases.postgresql;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.zaxxer.hikari.HikariDataSource;
 import org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.DialectDDLSQLGeneratorFactory;
@@ -52,7 +53,7 @@ import static org.junit.Assert.assertThat;
 @RunWith(Parameterized.class)
 public final class PostgreSQLDDLGeneratorIT {
     
-    private static final String CASE_FILE_PATH = "/ddlgenerator.xml";
+    private static final String CASE_FILE_PATH = "ddlgenerator.xml";
     
     private static final String PARENT_PATH = "env/scenario/manual/postgresql/ddlgenerator";
     
@@ -70,7 +71,7 @@ public final class PostgreSQLDDLGeneratorIT {
     
     public PostgreSQLDDLGeneratorIT(final ScalingParameterized parameterized) {
         this.parameterized = parameterized;
-        this.rootEntity = JAXB.unmarshal(Objects.requireNonNull(PostgreSQLDDLGeneratorIT.class.getClassLoader().getResource(parameterized.getParentPath() + CASE_FILE_PATH)),
+        this.rootEntity = JAXB.unmarshal(Objects.requireNonNull(PostgreSQLDDLGeneratorIT.class.getClassLoader().getResource(parameterized.getScenario())),
                 DDLGeneratorAssertionsRootEntity.class);
         this.dockerDatabaseContainer = DatabaseContainerFactory.newInstance(parameterized.getDatabaseType(), parameterized.getDockerImageName());
         dockerDatabaseContainer.start();
@@ -83,7 +84,7 @@ public final class PostgreSQLDDLGeneratorIT {
             if (Strings.isNullOrEmpty(each)) {
                 continue;
             }
-            result.add(new ScalingParameterized(new PostgreSQLDatabaseType(), each, PARENT_PATH));
+            result.add(new ScalingParameterized(new PostgreSQLDatabaseType(), each, Joiner.on("/").join(PARENT_PATH, CASE_FILE_PATH)));
         }
         return result;
     }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLIncrementTaskRunnable.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLIncrementTaskRunnable.java
deleted file mode 100644
index e090bb59b74..00000000000
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLIncrementTaskRunnable.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.integration.data.pipeline.cases.postgresql;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
-import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.sql.SQLException;
-import java.time.Instant;
-import java.util.LinkedList;
-import java.util.List;
-
-@Slf4j
-@AllArgsConstructor
-public final class PostgreSQLIncrementTaskRunnable implements Runnable {
-    
-    private final JdbcTemplate jdbcTemplate;
-    
-    private final ExtraSQLCommand extraSQLCommand;
-    
-    @Override
-    public void run() {
-        int executeCount = 0;
-        List<Long> newPrimaryKeys = new LinkedList<>();
-        try {
-            while (executeCount < 20 && !Thread.currentThread().isInterrupted()) {
-                newPrimaryKeys.add(insertOrderAndOrderItem());
-                if (newPrimaryKeys.size() % 2 == 0) {
-                    deleteOrderAndOrderItem(newPrimaryKeys.get(newPrimaryKeys.size() - 1));
-                } else {
-                    updateOrderAndOrderItem(newPrimaryKeys.get(newPrimaryKeys.size() - 1));
-                }
-                executeCount++;
-                log.info("Increment task runnable execute successfully.");
-            }
-        } catch (final SQLException ex) {
-            log.error("IncrementTaskThread error", ex);
-            throw new RuntimeException(ex);
-        }
-    }
-    
-    private long insertOrderAndOrderItem() throws SQLException {
-        Pair<Object[], Object[]> dataPair = TableCrudUtil.generateSimpleInsertData();
-        jdbcTemplate.update(extraSQLCommand.getInsertOrder(), dataPair.getLeft());
-        jdbcTemplate.update(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
-        return Long.parseLong(dataPair.getLeft()[0].toString());
-    }
-    
-    private void updateOrderAndOrderItem(final long primaryKey) throws SQLException {
-        long epochSecond = Instant.now().getEpochSecond();
-        jdbcTemplate.update(extraSQLCommand.getUpdateOrderById(), "update" + epochSecond, primaryKey);
-        jdbcTemplate.update(extraSQLCommand.getUpdateOrderItemById(), "changed" + epochSecond, primaryKey);
-    }
-    
-    private void deleteOrderAndOrderItem(final long primaryKey) throws SQLException {
-        jdbcTemplate.update(extraSQLCommand.getDeleteOrderById(), primaryKey);
-        jdbcTemplate.update(extraSQLCommand.getDeleteOrderItemById(), primaryKey);
-    }
-}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLManualScalingIT.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLManualScalingIT.java
index 5584c25504c..6a600831799 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLManualScalingIT.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/postgresql/PostgreSQLManualScalingIT.java
@@ -17,10 +17,14 @@
 
 package org.apache.shardingsphere.integration.data.pipeline.cases.postgresql;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
+import org.apache.shardingsphere.integration.data.pipeline.cases.base.BasePostgreSQLITCase;
+import org.apache.shardingsphere.integration.data.pipeline.cases.scenario.ScalingScenario;
 import org.apache.shardingsphere.integration.data.pipeline.env.IntegrationTestEnvironment;
 import org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
+import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,13 +32,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import java.util.concurrent.TimeUnit;
 
 @RunWith(Parameterized.class)
 public final class PostgreSQLManualScalingIT extends BasePostgreSQLITCase {
@@ -52,20 +50,35 @@ public final class PostgreSQLManualScalingIT extends BasePostgreSQLITCase {
             if (Strings.isNullOrEmpty(dockerImageName)) {
                 continue;
             }
-            result.add(new ScalingParameterized(DATABASE, dockerImageName, "env/scenario/manual/postgresql"));
+            for (String scenario : ScalingScenario.listScenario()) {
+                result.add(new ScalingParameterized(DATABASE_TYPE, dockerImageName, Joiner.on("/").join("env/scenario/manual/postgresql", scenario, ScalingScenario.SCENARIO_SUFFIX)));
+            }
         }
         return result;
     }
     
+    @Before
+    public void setUp() throws InterruptedException {
+        addResource();
+        initShardingAlgorithm();
+        // TODO wait for algorithm init
+        TimeUnit.SECONDS.sleep(2);
+        createScalingRule();
+        createSchema("test");
+    }
+    
     @Test
     public void assertManualScalingSuccess() throws InterruptedException {
-        List<Map<String, Object>> previewResults = getJdbcTemplate().queryForList("PREVIEW SELECT COUNT(1) FROM t_order");
-        Set<Object> originalSources = previewResults.stream().map(each -> each.get("data_source_name")).collect(Collectors.toSet());
-        assertThat(originalSources, is(Sets.newHashSet("ds_0", "ds_1")));
+        createAllSharingTableRule();
+        bindingShardingRule();
+        getSqlHelper().createOrderTable();
+        getSqlHelper().createOrderItemTable();
+        getSqlHelper().initTableData(true);
+        startIncrementTask(new SnowflakeKeyGenerateAlgorithm());
+        assertOriginalSourceSuccess();
         getJdbcTemplate().execute(getCommonSQLCommand().getAutoAlterTableRule());
-        Map<String, Object> showScalingResMap = getJdbcTemplate().queryForMap("SHOW SCALING LIST");
-        String jobId = String.valueOf(showScalingResMap.get("id"));
+        String jobId = String.valueOf(getJdbcTemplate().queryForMap("SHOW SCALING LIST").get("id"));
         getIncreaseTaskThread().join(60 * 1000L);
-        checkMatchConsistency(getJdbcTemplate(), jobId);
+        assertCheckMatchConsistencySuccess(getJdbcTemplate(), jobId);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/scenario/ScalingScenario.java
similarity index 63%
copy from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java
copy to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/scenario/ScalingScenario.java
index 27e6f1e648a..4c9aeb9dd63 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/scenario/ScalingScenario.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.framework.param;
+package org.apache.shardingsphere.integration.data.pipeline.cases.scenario;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import com.google.common.collect.Sets;
 
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public final class ScalingParameterized {
-    
-    private DatabaseType databaseType;
+import java.util.Collection;
+
+/**
+ * Manual scaling scenario.
+ */
+public final class ScalingScenario {
     
-    private String dockerImageName;
+    public static final String SCENARIO_SUFFIX = "sql.xml";
     
-    private String parentPath;
+    /**
+     * Manual scenario list.
+     *
+     * @return scenario list
+     */
+    public static Collection<String> listScenario() {
+        return Sets.newHashSet("integer_primary_key");
+    }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
index d78b497d1c6..b437a690bf0 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.test.integration.env.DataSourceEnvironment;
 import org.apache.shardingsphere.test.integration.framework.container.wait.JDBCConnectionWaitStrategy;
 import org.apache.shardingsphere.test.integration.framework.container.atomic.DockerITContainer;
 import org.testcontainers.containers.BindMode;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
 
 import java.sql.DriverManager;
 
@@ -52,7 +51,6 @@ public final class ShardingSphereProxyDockerContainer extends DockerITContainer
         } else {
             setWaitStrategy(new JDBCConnectionWaitStrategy(() -> DriverManager.getConnection(DataSourceEnvironment.getURL(databaseType, getHost(), getMappedPort(3307), ""), "root", "root")));
         }
-        withLogConsumer(new Slf4jLogConsumer(log).withSeparateOutputStreams());
     }
     
     private void mapConfigurationFiles() {
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingTableSQLHelper.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingTableSQLHelper.java
new file mode 100644
index 00000000000..0bfce120ec2
--- /dev/null
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingTableSQLHelper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.integration.data.pipeline.framework.helper;
+
+import lombok.AllArgsConstructor;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
+import org.apache.shardingsphere.integration.data.pipeline.cases.command.ExtraSQLCommand;
+import org.apache.shardingsphere.integration.data.pipeline.util.TableCrudUtil;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.util.List;
+
+/**
+ * SQL helper.
+ */
+@AllArgsConstructor
+public final class ScalingTableSQLHelper {
+    
+    private final DatabaseType databaseType;
+    
+    private final ExtraSQLCommand extraSQLCommand;
+    
+    private final JdbcTemplate jdbcTemplate;
+    
+    /**
+     * Create order table.
+     */
+    public void createOrderTable() {
+        jdbcTemplate.execute(extraSQLCommand.getCreateTableOrder());
+    }
+    
+    /**
+     * Create order item table.
+     */
+    public void createOrderItemTable() {
+        jdbcTemplate.execute(extraSQLCommand.getCreateTableOrderItem());
+    }
+    
+    /**
+     * Init table data.
+     *
+     * @param initOrderItemTogether whether init order item table together
+     */
+    public void initTableData(final boolean initOrderItemTogether) {
+        Pair<List<Object[]>, List<Object[]>> dataPair = Pair.of(null, null);
+        if (databaseType instanceof MySQLDatabaseType) {
+            dataPair = TableCrudUtil.generateMySQLInsertDataList(3000);
+        } else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
+            dataPair = TableCrudUtil.generatePostgresSQLInsertDataList(3000);
+        }
+        jdbcTemplate.batchUpdate(extraSQLCommand.getFullInsertOrder(), dataPair.getLeft());
+        if (initOrderItemTogether) {
+            jdbcTemplate.batchUpdate(extraSQLCommand.getInsertOrderItem(), dataPair.getRight());
+        }
+    }
+}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java
index 27e6f1e648a..69b9840c2a0 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/param/ScalingParameterized.java
@@ -31,5 +31,5 @@ public final class ScalingParameterized {
     
     private String dockerImageName;
     
-    private String parentPath;
+    private String scenario;
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/TableCrudUtil.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/TableCrudUtil.java
index 222ab607b77..f4af6301a93 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/TableCrudUtil.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/TableCrudUtil.java
@@ -55,18 +55,6 @@ public final class TableCrudUtil {
         return Pair.of(orderData, orderItemData);
     }
     
-    /**
-     * Generate MySQL simple insert data.
-     *
-     * @return insert data
-     */
-    public static Pair<Object[], Object[]> generateSimpleInsertData() {
-        long uniqueKey = (Long) SNOWFLAKE_GENERATE.generateKey();
-        int orderId = RANDOM.nextInt(0, 5);
-        int userId = RANDOM.nextInt(0, 5);
-        return Pair.of(new Object[]{uniqueKey, orderId, userId}, new Object[]{uniqueKey, orderId, userId, "OK"});
-    }
-    
     /**
      * Generate PostgreSQL simple insert data.
      *
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
index 41ca8e69b99..5717c0086df 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
@@ -16,22 +16,22 @@
   -->
 
 <command>
-    <create-sharding-algorithm>
+    <create-database-sharding-algorithm>
         CREATE SHARDING ALGORITHM database_inline (
         TYPE(NAME=INLINE,PROPERTIES("algorithm-expression"="ds_${user_id % 2}")))
-    </create-sharding-algorithm>
+    </create-database-sharding-algorithm>
     
-    <create-sharding-algorithm>
+    <create-order-sharding-algorithm>
         CREATE SHARDING ALGORITHM t_order_inline (
         TYPE(NAME=INLINE,PROPERTIES("algorithm-expression"="t_order_${order_id % 2}")))
-    </create-sharding-algorithm>
+    </create-order-sharding-algorithm>
     
-    <create-sharding-algorithm>
+    <create-order-item-sharding-algorithm>
         CREATE SHARDING ALGORITHM t_order_item_inline (
         TYPE(NAME=INLINE,PROPERTIES("algorithm-expression"="t_order_item_${order_id % 2}")))
-    </create-sharding-algorithm>
+    </create-order-item-sharding-algorithm>
     
-    <create-sharding-table>
+    <create-all-sharding-table-rule>
         CREATE SHARDING TABLE RULE t_order (
         DATANODES("ds_${0..1}.t_order_${0..1}"),
         DATABASE_STRATEGY(TYPE=standard,SHARDING_COLUMN=user_id,SHARDING_ALGORITHM=database_inline),
@@ -41,8 +41,18 @@
         DATANODES("ds_${0..1}.t_order_item_${0..1}"),
         DATABASE_STRATEGY(TYPE=standard,SHARDING_COLUMN=user_id,SHARDING_ALGORITHM=database_inline),
         TABLE_STRATEGY(TYPE=standard,SHARDING_COLUMN=order_id,SHARDING_ALGORITHM=t_order_item_inline),
-        KEY_GENERATE_STRATEGY(COLUMN=order_item_id,TYPE(NAME=snowflake)))
-    </create-sharding-table>
+        KEY_GENERATE_STRATEGY(COLUMN=order_item_id,TYPE(NAME=snowflake))
+        )
+    </create-all-sharding-table-rule>
+    
+    <create-order-sharding-table-rule>
+        CREATE SHARDING TABLE RULE t_order (
+        DATANODES("ds_${0..1}.t_order_${0..1}"),
+        DATABASE_STRATEGY(TYPE=standard,SHARDING_COLUMN=user_id,SHARDING_ALGORITHM=database_inline),
+        TABLE_STRATEGY(TYPE=standard,SHARDING_COLUMN=order_id,SHARDING_ALGORITHM=t_order_inline),
+        KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME=snowflake))
+        )
+    </create-order-sharding-table-rule>
     
     <alter-sharding-algorithm>
         ALTER SHARDING ALGORITHM database_inline
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/sql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/integer_primary_key/sql.xml
similarity index 100%
copy from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/sql.xml
copy to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/integer_primary_key/sql.xml
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/sql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/text_primary_key/sql.xml
similarity index 62%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/sql.xml
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/text_primary_key/sql.xml
index ddb1734d791..972c049c6ec 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/sql.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/mysql/text_primary_key/sql.xml
@@ -17,20 +17,10 @@
 <command>
     <create-table-order>
         CREATE TABLE `t_order` (
-        `id` BIGINT NOT NULL COMMENT 'pk id',
+        `id` varchar(255) NOT NULL COMMENT 'pk id',
         `order_id` INT NOT NULL,
         `user_id` INT NOT NULL,
-        `t_varchar` VARCHAR ( 255 ) NULL,
-        `t_tinyint` TINYINT ( 1 ) NULL,
-        `t_timestamp` TIMESTAMP NULL,
-        `t_datetime` datetime NULL ON UPDATE CURRENT_TIMESTAMP,
-        `t_binary` BINARY(255) NULL,
-        `t_blob` BLOB NULL,
-        `t_decimal` DECIMAL ( 10, 2 ) NULL,
-        `t_char` CHAR ( 128 ) NULL,
-        `t_double` DOUBLE NULL,
-        `t_json` json NULL COMMENT 'json test',
-        `t_unsigned_int` int UNSIGNED NULL,
+        `update_date` DATETIME NULL,
         PRIMARY KEY ( `id` ),
         INDEX ( `order_id` )
         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@@ -46,26 +36,6 @@
         ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
     </create-table-order-item>
     
-    <full-insert-order>
-        INSERT INTO t_order (
-        id,
-        order_id,
-        user_id,
-        t_varchar,
-        t_tinyint,
-        t_timestamp,
-        t_datetime,
-        t_binary,
-        t_blob,
-        t_decimal,
-        t_char,
-        t_double,
-        t_json,
-        t_unsigned_int
-        )
-        VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
-    </full-insert-order>
-    
     <insert-order>
         INSERT INTO t_order (id,order_id,user_id) VALUES (?, ?, ?)
     </insert-order>
@@ -74,14 +44,6 @@
         INSERT INTO t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)
     </insert-order-item>
     
-    <update-order-by-id>
-        UPDATE t_order SET t_varchar = ?,t_unsigned_int = ? WHERE id = ?
-    </update-order-by-id>
-    
-    <update-order-item-by-id>
-        UPDATE t_order_item SET status = ? WHERE item_id = ?
-    </update-order-item-by-id>
-    
     <delete-order-by-id>
         DELETE FROM t_order WHERE id = ?
     </delete-order-by-id>
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/sql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/integer_primary_key/sql.xml
similarity index 100%
copy from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/sql.xml
copy to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/integer_primary_key/sql.xml
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/sql.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/text_primary_key/sql.xml
similarity index 50%
rename from shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/sql.xml
rename to shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/text_primary_key/sql.xml
index db800537266..fdc7bbb7c17 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/sql.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/manual/postgresql/text_primary_key/sql.xml
@@ -17,49 +17,18 @@
 <command>
     <create-table-order>
         CREATE TABLE test.t_order (
-        id int8 NOT NULL,
+        id varchar(255) NOT NULL,
         order_id int4 NOT NULL,
         user_id int NOT NULL,
-        t_varchar varchar(50) DEFAULT NULL,
         PRIMARY KEY (id)
         )
     </create-table-order>
     
-    <create-table-order-item>
-        CREATE TABLE test.t_order_item (
-        item_id int8 NOT NULL,
-        order_id int4 NOT NULL,
-        user_id int8 NOT NULL,
-        status varchar(50),
-        PRIMARY KEY (item_id)
-        )
-    </create-table-order-item>
-    
     <full-insert-order>
-        INSERT INTO test.t_order(id, order_id, user_id, t_varchar) VALUES (?,?,?,?)
+        INSERT INTO t_order (id,order_id,user_id) VALUES (?, ?, ?)
     </full-insert-order>
     
-    <insert-order>
-        INSERT INTO test.t_order ( id, order_id, user_id) VALUES (?, ?, ?)
-    </insert-order>
-    
-    <insert-order-item>
-        INSERT INTO test.t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)
-    </insert-order-item>
-    
-    <update-order-by-id>
-        UPDATE test.t_order SET t_varchar = ? WHERE id = ?
-    </update-order-by-id>
-    
-    <update-order-item-by-id>
-        UPDATE test.t_order_item SET status = ? WHERE item_id = ?
-    </update-order-item-by-id>
-    
     <delete-order-by-id>
-        DELETE FROM test.t_order WHERE id = ?
+        DELETE FROM t_order WHERE id = ?
     </delete-order-by-id>
-    
-    <delete-order-item-by-id>
-        DELETE FROM test.t_order_item WHERE item_id = ?
-    </delete-order-item-by-id>
 </command>