You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/20 11:01:15 UTC

[shardingsphere] branch master updated: Use PipelineDDLGenerator replace mysql and opengauss getting create table sql (#17822)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 852b0047ffa Use PipelineDDLGenerator replace mysql and opengauss getting create table sql  (#17822)
852b0047ffa is described below

commit 852b0047ffa24a971ee46d2e97beec7e14f1a6e6
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Fri May 20 19:01:06 2022 +0800

    Use PipelineDDLGenerator replace mysql and opengauss getting create table sql  (#17822)
---
 .../pipeline/core/execute/PipelineJobExecutor.java |   1 +
 .../datasource/AbstractDataSourcePreparer.java     |   8 --
 .../datasource/MySQLDataSourcePreparer.java        |  40 +++----
 .../datasource/OpenGaussDataSourcePreparer.java    | 130 +++------------------
 .../datasource/PostgreSQLDataSourcePreparer.java   |   5 +-
 .../data/pipeline/cases/base/BaseITCase.java       |   5 +-
 .../src/test/resources/env/logback.xml             |   2 -
 .../src/test/resources/logback-test.xml            |   1 +
 8 files changed, 36 insertions(+), 156 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index f90d9ae1093..9dd43a3c79a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -125,5 +125,6 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     @Override
     protected void doStop() {
         executor.shutdown();
+        executor.shutdownNow();
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 99ebc1c9054..73e134b6a8e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
 import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
@@ -35,12 +34,9 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * Abstract data source preparer.
@@ -125,10 +121,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
         }
     }
     
-    protected final Collection<String> splitTableDefinitionToSQLs(final ActualTableDefinition actualTableDefinition) {
-        return Arrays.stream(actualTableDefinition.getTableDefinition().split(";")).collect(Collectors.toList());
-    }
-    
     // TODO simple lexer
     protected final TableDefinitionSQLType getTableDefinitionSQLType(final String sql) {
         if (PATTERN_CREATE_TABLE.matcher(sql).find()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 76f881fc653..5a958aa1b1b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,21 +18,19 @@
 package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Data source preparer for MySQL.
@@ -40,20 +38,12 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
-    private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getInstance("MySQL");
-    
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
-        RuleAlteredJobConfiguration jobConfig = parameter.getJobConfig();
         PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
-        try (
-                Connection sourceConnection = getSourceCachedDataSource(jobConfig, dataSourceManager).getConnection();
-                Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
-            Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
-            for (String each : logicTableNames) {
-                String createTableSQL = getCreateTableSQL(sourceConnection, each);
-                createTableSQL = addIfNotExistsForCreateTableSQL(createTableSQL);
-                executeTargetTableSQL(targetConnection, createTableSQL);
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
+            for (String each : getCreateTableSQL(parameter)) {
+                executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
                 log.info("create target table '{}' success", each);
             }
         } catch (final SQLException ex) {
@@ -61,13 +51,13 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
         }
     }
     
-    private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
-        String showCreateTableSQL = "SHOW CREATE TABLE " + SQL_BUILDER.quote(logicTableName);
-        try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
-            if (!resultSet.next()) {
-                throw new PipelineJobPrepareFailedException("show create table has no result, sql: " + showCreateTableSQL);
-            }
-            return resultSet.getString(2);
+    private List<String> getCreateTableSQL(final PrepareTargetTablesParameter parameter) {
+        PipelineDDLGenerator generator = new PipelineDDLGenerator(PipelineContext.getContextManager());
+        List<String> result = new LinkedList<>();
+        for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
+            String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+            result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
         }
+        return result;
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index a3dd652e917..20b6776045c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -17,31 +17,21 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
 
+import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 /**
@@ -50,119 +40,27 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
     
-    private static final String WITH_OF_TABLE_EXTEND = "with (";
-    
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
-        Collection<ActualTableDefinition> actualTableDefinitions;
-        try {
-            actualTableDefinitions = getActualTableDefinitions(parameter);
-        } catch (final SQLException ex) {
-            throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
-        }
-        Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions, parameter.getTableNameSchemaNameMapping());
+        List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
         try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
-            for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
-                for (String each : entry.getValue()) {
+            for (String createLogicalTableSQL : createLogicalTableSQLs) {
+                for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
                     executeTargetTableSQL(targetConnection, each);
                 }
-                log.info("create target table '{}' success", entry.getKey());
             }
         } catch (final SQLException ex) {
             throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
         }
     }
     
-    private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
-        Collection<ActualTableDefinition> result = new LinkedList<>();
-        ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(
-                parameter.getJobConfig().getSource().getType(), parameter.getJobConfig().getSource().getParameter());
-        // TODO reuse PipelineDataSourceManager
-        try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
-            for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
-                DataNode dataNode = each.getDataNodes().get(0);
-                // Keep dataSource to reuse
-                PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(sourceDataSourceConfig.getActualDataSourceConfig(dataNode.getDataSourceName()));
-                try (Connection sourceConnection = dataSource.getConnection()) {
-                    String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-                    String actualTableName = dataNode.getTableName();
-                    String tableDefinition = queryTableDefinition(sourceConnection, schemaName, actualTableName);
-                    log.info("getActualTableDefinitions, schemaName={}, dataNode={}, tableDefinition={}", schemaName, dataNode, tableDefinition);
-                    String logicTableName = each.getLogicTableName();
-                    result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
-                }
-            }
-        }
-        return result;
-    }
-    
-    private String queryTableDefinition(final Connection sourceConnection, final String schemaName, final String actualTableName) throws SQLException {
-        String sql = String.format("SELECT * FROM pg_get_tabledef('%s.%s'::regclass::oid)", schemaName, actualTableName);
-        log.info("queryTableDefinition, sql={}", sql);
-        try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
-            if (!resultSet.next()) {
-                throw new PipelineJobPrepareFailedException("table definition has no result, sql: " + sql);
-            }
-            return resultSet.getString(1);
-        }
-    }
-    
-    /**
-     * Get create logic table SQLs.
-     *
-     * @param actualTableDefinitions actual table definitions. key is logic table name, value is actual table definition.
-     * @return all SQLs. key is logic table name, value is collection of logic table SQLs.
-     */
-    private Map<String, Collection<String>> getCreateLogicTableSQLs(final Collection<ActualTableDefinition> actualTableDefinitions, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
-        Map<String, Collection<String>> result = new HashMap<>();
-        for (ActualTableDefinition each : actualTableDefinitions) {
-            String schemaName = tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
-            Collection<String> logicTableSQLs = splitTableDefinitionToSQLs(each).stream().map(sql -> {
-                TableDefinitionSQLType sqlType = getTableDefinitionSQLType(sql);
-                switch (sqlType) {
-                    // TODO replace constraint and index name
-                    case CREATE_TABLE:
-                        sql = addIfNotExistsForCreateTableSQL(sql);
-                        sql = appendSchemaName(sql, each.getActualTableName(), schemaName);
-                        sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
-                        sql = skipCreateTableExtendSet(sql);
-                        return sql;
-                    case ALTER_TABLE:
-                        sql = appendSchemaName(sql, each.getActualTableName(), schemaName);
-                        return replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
-                    default:
-                        return "";
-                }
-            }).filter(sql -> !"".equals(sql)).collect(Collectors.toList());
-            result.put(each.getLogicTableName(), logicTableSQLs);
+    private List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) {
+        PipelineDDLGenerator generator = new PipelineDDLGenerator(PipelineContext.getContextManager());
+        List<String> result = new LinkedList<>();
+        for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
+            String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+            result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
-    
-    private String appendSchemaName(final String createOrAlterTableSQL, final String actualTableName, final String schemaName) {
-        int start = createOrAlterTableSQL.indexOf(actualTableName);
-        if (start <= 0) {
-            return createOrAlterTableSQL;
-        }
-        return new StringBuilder(createOrAlterTableSQL).insert(start, schemaName + ".").toString();
-    }
-    
-    private String skipCreateTableExtendSet(final String createSQL) {
-        String lowerCreateSQL = createSQL.toLowerCase();
-        String[] search = {WITH_OF_TABLE_EXTEND, ")"};
-        List<Integer> searchPos = new ArrayList<>(2);
-        int startPos = 0;
-        for (String each : search) {
-            int curSearch = lowerCreateSQL.indexOf(each, startPos);
-            if (curSearch <= 0) {
-                break;
-            }
-            searchPos.add(curSearch);
-            startPos = curSearch;
-        }
-        if (searchPos.size() != search.length) {
-            return createSQL;
-        }
-        return createSQL.substring(0, searchPos.get(0)) + createSQL.substring(searchPos.get(1) + 1);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8800a400976..8168da6990e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
 import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 
 import java.sql.Connection;
@@ -41,8 +40,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
-    private static final DatabaseType DATABASE_TYPE = new PostgreSQLDatabaseType();
-    
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
         List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
@@ -62,7 +59,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
         List<String> result = new LinkedList<>();
         for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
             String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-            result.add(generator.generateLogicDDLSQL(DATABASE_TYPE, parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+            result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index d726d388b1c..af08177c3c6 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -225,7 +225,10 @@ public abstract class BaseITCase {
             increaseTaskThread.join(60 * 1000L);
         }
         TimeUnit.SECONDS.sleep(2);
-        String jobId = String.valueOf(getJdbcTemplate().queryForMap("SHOW SCALING LIST").get("id"));
+        List<Map<String, Object>> scalingListMap = getJdbcTemplate().queryForList("SHOW SCALING LIST");
+        assertThat(scalingListMap.size(), is(1));
+        Object jobId = scalingListMap.get(0).get("id");
+        log.info("jobId: {}", jobId);
         Map<String, String> actualStatusMap = new HashMap<>(2, 1);
         for (int i = 0; i < 100; i++) {
             List<Map<String, Object>> showScalingStatusResMap = jdbcTemplate.queryForList(String.format("SHOW SCALING STATUS %s", jobId));
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
index 281fafeae6a..487879aca84 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
@@ -34,9 +34,7 @@
     <logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.lock" level="DEBUG" additivity="false">
         <appender-ref ref="console" />
     </logger>
-    <logger name="org.testcontainers.containers.ContainerState" level="OFF" />
     <logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
-    <logger name="com.atomikos.icatch.provider" level="OFF" />
     <root>
         <level value="WARN" />
         <appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index b13e574bb64..200d454e788 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -25,6 +25,7 @@
     <logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG" additivity="false">
         <appender-ref ref="console" />
     </logger>
+    <logger name="org.apache.shardingsphere.test.integration.framework.container.atomic.DockerITContainer" level="WARN" />
     <logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
     <root level="INFO">
         <appender-ref ref="console" />