You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/10 02:53:54 UTC

[shardingsphere] branch master updated: Use PipelineDDLGenerator to generate postgres ddl (#17469)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 566b87e00b9  Use PipelineDDLGenerator to generate postgres ddl (#17469)
566b87e00b9 is described below

commit 566b87e00b946ad7c585538a16c51484bbe4daf8
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Tue May 10 10:53:48 2022 +0800

     Use PipelineDDLGenerator to generate postgres ddl (#17469)
    
    * Fix execute in different schema of query oid
    
    * Use PipelineDDLGenerator to generate postgres ddl
    
    * Fix codestyle
    
    * Add warn log
    
    * Code optimization
    
    * Fix host debug port.
    
    * Support multiple statement
    
    * Change freemarker Configuration, use ClassTemplateLoader
    
    * Fix action
    
    * Change log level
---
 .../datasource/PostgreSQLDataSourcePreparer.java   | 195 ++-------------------
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |   3 +-
 .../postgresql/util/FreemarkerManager.java         |   4 +-
 .../template/table/default/get_table_id.ftl        |   2 +-
 .../data/pipeline/cases/BaseITCase.java            |   6 +-
 .../proxy/ShardingSphereProxyDockerContainer.java  |   4 +-
 .../src/test/resources/env/logback.xml             |   4 +-
 7 files changed, 28 insertions(+), 190 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index ec7aa448dd4..8800a400976 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -17,33 +17,22 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.prepare.datasource;
 
-import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
+import com.google.common.base.Splitter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
 import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 
 import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.StringJoiner;
 import java.util.stream.Collectors;
 
 /**
@@ -52,42 +41,14 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePreparer {
     
-    private static final String FETCH_CREATE_TABLE_TEMPLATE = "SELECT 'CREATE TABLE IF NOT EXISTS '||'%s'||'('||STRING_AGG"
-            + "(column_list.column_expr, ',')||',PRIMARY KEY({PK_placeholder}))' FROM (SELECT *,column_name||' '||data_type||COALESCE('('"
-            + " || character_maximum_length || ')', '')||CASE WHEN is_nullable = 'YES' then '' ELSE ' NOT NULL' END AS column_expr "
-            + "FROM information_schema.columns WHERE table_name = '%s' AND table_schema='%s' ORDER BY ordinal_position) column_list";
-    
-    private static final String FETCH_NORMAL_INDEXES_TEMPLATE = "SELECT schemaname,indexname,indexdef FROM pg_indexes WHERE tablename = "
-            + "'%s' AND indexname != '%s' AND schemaname='%s'";
-    
-    private static final String DROP_INDEX_TEMPLATE = "DROP INDEX IF EXISTS %s.%s";
-    
-    private static final String FETCH_PRIMARY_KEY_TEMPLATE = "SELECT constraint_name,column_name FROM information_schema"
-            + ".key_column_usage WHERE table_name = '%s' AND table_schema='%s'";
-    
-    private static final String COMMENT_TEMPLATE = "COMMENT ON %s %s IS '%s'";
-    
-    private static final String FETCH_TABLE_COLUMN_TEMPLATE = "SELECT ordinal_position,column_name FROM information_schema.columns WHERE "
-            + "table_name = '%s' AND table_schema='%s' ORDER BY ordinal_position";
-    
-    private static final String FETCH_COMMENT_TEMPLATE = "SELECT objsubid,description FROM pg_catalog.pg_description WHERE objoid = "
-            + "'%s'::regclass";
+    private static final DatabaseType DATABASE_TYPE = new PostgreSQLDatabaseType();
     
     @Override
     public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
-        Collection<ActualTableDefinition> actualTableDefinitions;
-        try {
-            actualTableDefinitions = getActualTableDefinitions(parameter);
-        } catch (final SQLException ex) {
-            log.error("failed to get actual table definitions", ex);
-            throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
-        }
-        log.info("prepareTargetTables, actualTableDefinitions={}", actualTableDefinitions);
-        Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions);
-        try (
-                Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
-            for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
-                for (String each : entry.getValue()) {
+        List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
+        try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
+            for (String createLogicalTableSQL : createLogicalTableSQLs) {
+                for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
                     executeTargetTableSQL(targetConnection, each);
                 }
             }
@@ -96,136 +57,12 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
         }
     }
     
-    private Map<String, Collection<String>> getCreateLogicTableSQLs(final Collection<ActualTableDefinition> actualTableDefinitions) {
-        Map<String, Collection<String>> result = new HashMap<>(actualTableDefinitions.size(), 1);
-        for (ActualTableDefinition each : actualTableDefinitions) {
-            Collection<String> logicTableSQLs = splitTableDefinitionToSQLs(each).stream().map(sql -> {
-                TableDefinitionSQLType sqlType = getTableDefinitionSQLType(sql);
-                switch (sqlType) {
-                    case CREATE_TABLE:
-                    case ALTER_TABLE:
-                        return replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
-                    case CREATE_INDEX:
-                        return sql.replace(each.getActualTableName(), each.getLogicTableName());
-                    case DROP_INDEX:
-                        return sql.replace(each.getActualTableName(), each.getLogicTableName());
-                    case COMMENT_ON:
-                        return sql.replace(each.getActualTableName(), each.getLogicTableName());
-                    default:
-                        return "";
-                }
-            }).filter(sql -> !Strings.isNullOrEmpty(sql)).collect(Collectors.toList());
-            result.put(each.getLogicTableName(), logicTableSQLs);
-        }
-        return result;
-    }
-    
-    private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
-        Collection<ActualTableDefinition> result = new LinkedList<>();
-        ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig =
-                (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(parameter.getJobConfig().getSource().getType(),
-                        parameter.getJobConfig().getSource().getParameter());
-        try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
-            for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
-                DataNode dataNode = each.getDataNodes().get(0);
-                // TODO to remove if config is fix the problem
-                final Map<String, Object> configMap =
-                        sourceDataSourceConfig.getRootConfig().getDataSources().get(dataNode.getDataSourceName());
-                // TODO one of jdbcUrl or url might not exist, refactor to getActualDataSourceConfig
-                configMap.put("jdbcUrl", configMap.get("url"));
-                PipelineDataSourceWrapper dataSource =
-                        dataSourceManager.getDataSource(sourceDataSourceConfig.getActualDataSourceConfig(dataNode.getDataSourceName()));
-                try (Connection sourceConnection = dataSource.getConnection()) {
-                    String actualTableName = dataNode.getTableName();
-                    String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
-                    StringJoiner joiner = new StringJoiner(";");
-                    Pair<String, List<String>> primaryKeyPair = queryTablePrimaryKey(sourceConnection, schemaName, actualTableName);
-                    joiner.add(queryCreateTableSql(sourceConnection, schemaName, actualTableName, primaryKeyPair.getRight()));
-                    queryCreateIndexes(sourceConnection, schemaName, actualTableName, primaryKeyPair.getLeft()).forEach(joiner::add);
-                    // TODO support query comment for multi schema
-                    // queryCommentOnList(sourceConnection, schemaName, actualTableName).forEach(joiner::add);
-                    String tableDefinition = joiner.toString();
-                    result.add(new ActualTableDefinition(each.getLogicTableName(), actualTableName, tableDefinition));
-                }
-            }
-        }
-        return result;
-    }
-    
-    private Pair<String, List<String>> queryTablePrimaryKey(final Connection sourceConnection, final String schemaName, final String actualTableName) throws SQLException {
-        String sql = String.format(FETCH_PRIMARY_KEY_TEMPLATE, actualTableName, schemaName);
-        log.info("queryTablePrimaryKey, sql={}", sql);
-        String primaryKeyName = null;
-        List<String> primaryKeyColumns = new LinkedList<>();
-        try (
-                Statement statement = sourceConnection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sql)) {
-            while (resultSet.next()) {
-                primaryKeyName = primaryKeyName == null ? resultSet.getString(1) : primaryKeyName;
-                primaryKeyColumns.add(resultSet.getString(2));
-            }
-            if (primaryKeyColumns.size() == 0 || primaryKeyName == null) {
-                throw new PipelineJobPrepareFailedException("not support no primary key table:" + actualTableName);
-            }
-        }
-        return Pair.of(primaryKeyName, primaryKeyColumns);
-    }
-    
-    private String queryCreateTableSql(final Connection sourceConnection, final String schemaName, final String actualTableName, final List<String> pkColumns) throws SQLException {
-        final String sql = String.format(FETCH_CREATE_TABLE_TEMPLATE, schemaName + "." + actualTableName, actualTableName, schemaName);
-        log.info("queryCreateTableSql, sql={}", sql);
-        try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
-            if (!resultSet.next()) {
-                throw new PipelineJobPrepareFailedException("table struct has no result, sql: " + sql);
-            }
-            return resultSet.getString(1).replace("{PK_placeholder}", String.join(",", pkColumns));
-        }
-    }
-    
-    private List<String> queryCreateIndexes(final Connection sourceConnection, final String schemaName, final String actualTableName, final String pkName) throws SQLException {
+    private List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) {
+        PipelineDDLGenerator generator = new PipelineDDLGenerator(PipelineContext.getContextManager());
         List<String> result = new LinkedList<>();
-        String sql = String.format(FETCH_NORMAL_INDEXES_TEMPLATE, actualTableName, pkName, schemaName);
-        log.info("queryCreateIndexes, sql={}", sql);
-        try (
-                Statement statement = sourceConnection.createStatement();
-                ResultSet resultSet = statement.executeQuery(sql)) {
-            while (resultSet.next()) {
-                // TODO add drop index first, make sure the index is not exist
-                result.add(String.format(DROP_INDEX_TEMPLATE, resultSet.getString(1), resultSet.getString(2)));
-                result.add(resultSet.getString(3));
-            }
-        }
-        return result;
-    }
-    
-    private List<String> queryCommentOnList(final Connection sourceConnection, final String schemaName, final String actualTableName) throws SQLException {
-        final String fetchCommentSql = String.format(FETCH_COMMENT_TEMPLATE, actualTableName);
-        log.info("queryCommentOnList, fetchCommentSql={}", fetchCommentSql);
-        List<String> result = new LinkedList<>();
-        Map<Integer, String> commentMap = Maps.newHashMap();
-        try (
-                Statement statement = sourceConnection.createStatement();
-                ResultSet commentResult = statement.executeQuery(fetchCommentSql)) {
-            while (commentResult.next()) {
-                commentMap.put(commentResult.getInt(1), commentResult.getString(2));
-            }
-            String tableComment = commentMap.remove(0);
-            if (!Strings.isNullOrEmpty(tableComment)) {
-                result.add(String.format(COMMENT_TEMPLATE, "TABLE", actualTableName, tableComment));
-            }
-        }
-        final String fetchColumnSql = String.format(FETCH_TABLE_COLUMN_TEMPLATE, actualTableName, schemaName);
-        log.info("queryCommentOnList, fetchColumnSql={}", fetchColumnSql);
-        try (
-                Statement statement = sourceConnection.createStatement();
-                ResultSet columnsResult = statement.executeQuery(fetchColumnSql)) {
-            while (columnsResult.next()) {
-                String columnComment = commentMap.get(columnsResult.getInt(1));
-                if (columnComment != null) {
-                    result.add(String.format(COMMENT_TEMPLATE, "COLUMN",
-                            new StringJoiner(".").add(actualTableName).add(columnsResult.getString(2)), columnComment));
-                }
-            }
+        for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
+            String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+            result.add(generator.generateLogicDDLSQL(DATABASE_TYPE, parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
         }
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 937bb5bf070..197c0a2d712 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -33,7 +33,8 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
     
     @Override
     public String buildCreateSchemaSQL(final String schemaName) {
-        return "CREATE SCHEMA IF NOT EXISTS " + quote(schemaName);
+        // TODO remove first, add when kernel fixed bug.
+        return "CREATE SCHEMA IF NOT EXISTS " + schemaName;
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/util/FreemarkerManager.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/util/FreemarkerManager.java
index f6334bcd205..605be87b2b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/util/FreemarkerManager.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/util/FreemarkerManager.java
@@ -25,13 +25,11 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -72,7 +70,7 @@ public final class FreemarkerManager {
     @SneakyThrows
     private Configuration createTemplateConfiguration() {
         Configuration result = new Configuration(Configuration.VERSION_2_3_31);
-        result.setDirectoryForTemplateLoading(new File(Objects.requireNonNull(FreemarkerManager.class.getClassLoader().getResource("template")).getFile()));
+        result.setClassForTemplateLoading(this.getClass(), "/template");
         result.setDefaultEncoding("UTF-8");
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/template/table/default/get_table_id.ftl b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/template/table/default/get_table_id.ftl
index 7853ed437ca..44578d29dea 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/template/table/default/get_table_id.ftl
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/template/table/default/get_table_id.ftl
@@ -15,4 +15,4 @@
   ~ limitations under the License.
   -->
 
-SELECT tablename::REGCLASS::OID AS tid FROM pg_catalog.pg_tables WHERE schemaname = '${schemaName}' and tablename = '${tableName}';
+SELECT '${schemaName}.${tableName}'::REGCLASS::OID AS tid;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java
index abc6a9e5288..460bc18d34e 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/BaseITCase.java
@@ -23,6 +23,7 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
@@ -62,6 +63,7 @@ import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+@Slf4j
 @Getter(AccessLevel.PROTECTED)
 public abstract class BaseITCase {
     
@@ -159,6 +161,7 @@ public abstract class BaseITCase {
         Map<String, String> actualStatusMap = new HashMap<>(2, 1);
         for (int i = 0; i < 100; i++) {
             List<Map<String, Object>> showScalingStatusResMap = jdbcTemplate.queryForList(String.format("SHOW SCALING STATUS %s", jobId));
+            log.warn("actualStatusMap: {}", actualStatusMap);
             boolean finished = true;
             for (Map<String, Object> entry : showScalingStatusResMap) {
                 String status = entry.get("status").toString();
@@ -174,9 +177,8 @@ public abstract class BaseITCase {
             }
             if (finished) {
                 break;
-            } else {
-                TimeUnit.SECONDS.sleep(2);
             }
+            TimeUnit.SECONDS.sleep(2);
         }
         assertThat(actualStatusMap.values().stream().filter(StringUtils::isNotBlank).collect(Collectors.toSet()).size(), is(1));
         jdbcTemplate.execute(String.format("STOP SCALING SOURCE WRITING %s", jobId));
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
index 3decd53f789..6be53095f95 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/container/proxy/ShardingSphereProxyDockerContainer.java
@@ -51,7 +51,7 @@ public final class ShardingSphereProxyDockerContainer extends DockerITContainer
         } else {
             setWaitStrategy(new JDBCConnectionWaitStrategy(() -> DriverManager.getConnection(DataSourceEnvironment.getURL(databaseType, getHost(), getMappedPort(3307), ""), "root", "root")));
         }
-        withLogConsumer(new Slf4jLogConsumer(log));
+        withLogConsumer(new Slf4jLogConsumer(log).withSeparateOutputStreams());
     }
     
     private void mapConfigurationFiles() {
@@ -59,7 +59,7 @@ public final class ShardingSphereProxyDockerContainer extends DockerITContainer
         withClasspathResourceMapping("/env/logback.xml", "/opt/shardingsphere-proxy/conf/logback.xml", BindMode.READ_ONLY);
         if (ITEnvTypeEnum.NATIVE == IntegrationTestEnvironment.getInstance().getItEnvType()) {
             addFixedExposedPort(3307, 3307);
-            addFixedExposedPort(3308, 5005);
+            addFixedExposedPort(5005, 3308);
         }
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
index b2ce9ab24d3..4f68c6eeb6b 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
@@ -22,12 +22,12 @@
             <pattern>[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
         </encoder>
     </appender>
-    <logger name="org.apache.shardingsphere" level="error" additivity="false">
+    <logger name="org.apache.shardingsphere" level="warn" additivity="false">
         <appender-ref ref="console" />
     </logger>
     
     <root>
-        <level value="error" />
+        <level value="WARN" />
         <appender-ref ref="console" />
     </root>
 </configuration>