You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/05/20 11:01:15 UTC
[shardingsphere] branch master updated: Use PipelineDDLGenerator replace mysql and opengauss getting create table sql (#17822)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 852b0047ffa Use PipelineDDLGenerator replace mysql and opengauss getting create table sql (#17822)
852b0047ffa is described below
commit 852b0047ffa24a971ee46d2e97beec7e14f1a6e6
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Fri May 20 19:01:06 2022 +0800
Use PipelineDDLGenerator replace mysql and opengauss getting create table sql (#17822)
---
.../pipeline/core/execute/PipelineJobExecutor.java | 1 +
.../datasource/AbstractDataSourcePreparer.java | 8 --
.../datasource/MySQLDataSourcePreparer.java | 40 +++----
.../datasource/OpenGaussDataSourcePreparer.java | 130 +++------------------
.../datasource/PostgreSQLDataSourcePreparer.java | 5 +-
.../data/pipeline/cases/base/BaseITCase.java | 5 +-
.../src/test/resources/env/logback.xml | 2 -
.../src/test/resources/logback-test.xml | 1 +
8 files changed, 36 insertions(+), 156 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index f90d9ae1093..9dd43a3c79a 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -125,5 +125,6 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
@Override
protected void doStop() {
executor.shutdown();
+ executor.shutdownNow();
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index 99ebc1c9054..73e134b6a8e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -23,7 +23,6 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
@@ -35,12 +34,9 @@ import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/**
* Abstract data source preparer.
@@ -125,10 +121,6 @@ public abstract class AbstractDataSourcePreparer implements DataSourcePreparer {
}
}
- protected final Collection<String> splitTableDefinitionToSQLs(final ActualTableDefinition actualTableDefinition) {
- return Arrays.stream(actualTableDefinition.getTableDefinition().split(";")).collect(Collectors.toList());
- }
-
// TODO simple lexer
protected final TableDefinitionSQLType getTableDefinitionSQLType(final String sql) {
if (PATTERN_CREATE_TABLE.matcher(sql).find()) {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 76f881fc653..5a958aa1b1b 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -18,21 +18,19 @@
package org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.LinkedList;
+import java.util.List;
/**
* Data source preparer for MySQL.
@@ -40,20 +38,12 @@ import java.util.stream.Collectors;
@Slf4j
public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
- private static final MySQLPipelineSQLBuilder SQL_BUILDER = (MySQLPipelineSQLBuilder) PipelineSQLBuilderFactory.getInstance("MySQL");
-
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
- RuleAlteredJobConfiguration jobConfig = parameter.getJobConfig();
PipelineDataSourceManager dataSourceManager = parameter.getDataSourceManager();
- try (
- Connection sourceConnection = getSourceCachedDataSource(jobConfig, dataSourceManager).getConnection();
- Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
- Collection<String> logicTableNames = parameter.getTablesFirstDataNodes().getEntries().stream().map(JobDataNodeEntry::getLogicTableName).collect(Collectors.toList());
- for (String each : logicTableNames) {
- String createTableSQL = getCreateTableSQL(sourceConnection, each);
- createTableSQL = addIfNotExistsForCreateTableSQL(createTableSQL);
- executeTargetTableSQL(targetConnection, createTableSQL);
+ try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), dataSourceManager).getConnection()) {
+ for (String each : getCreateTableSQL(parameter)) {
+ executeTargetTableSQL(targetConnection, addIfNotExistsForCreateTableSQL(each));
log.info("create target table '{}' success", each);
}
} catch (final SQLException ex) {
@@ -61,13 +51,13 @@ public final class MySQLDataSourcePreparer extends AbstractDataSourcePreparer {
}
}
- private String getCreateTableSQL(final Connection sourceConnection, final String logicTableName) throws SQLException {
- String showCreateTableSQL = "SHOW CREATE TABLE " + SQL_BUILDER.quote(logicTableName);
- try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(showCreateTableSQL)) {
- if (!resultSet.next()) {
- throw new PipelineJobPrepareFailedException("show create table has no result, sql: " + showCreateTableSQL);
- }
- return resultSet.getString(2);
+ private List<String> getCreateTableSQL(final PrepareTargetTablesParameter parameter) {
+ PipelineDDLGenerator generator = new PipelineDDLGenerator(PipelineContext.getContextManager());
+ List<String> result = new LinkedList<>();
+ for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
+ String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+ result.add(generator.generateLogicDDLSQL(new MySQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
}
+ return result;
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
index a3dd652e917..20b6776045c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePreparer.java
@@ -17,31 +17,21 @@
package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
+import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.ActualTableDefinition;
-import org.apache.shardingsphere.data.pipeline.api.prepare.datasource.TableDefinitionSQLType;
-import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
@@ -50,119 +40,27 @@ import java.util.stream.Collectors;
@Slf4j
public final class OpenGaussDataSourcePreparer extends AbstractDataSourcePreparer {
- private static final String WITH_OF_TABLE_EXTEND = "with (";
-
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
- Collection<ActualTableDefinition> actualTableDefinitions;
- try {
- actualTableDefinitions = getActualTableDefinitions(parameter);
- } catch (final SQLException ex) {
- throw new PipelineJobPrepareFailedException("get table definitions failed.", ex);
- }
- Map<String, Collection<String>> createLogicTableSQLs = getCreateLogicTableSQLs(actualTableDefinitions, parameter.getTableNameSchemaNameMapping());
+ List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
try (Connection targetConnection = getTargetCachedDataSource(parameter.getTaskConfig(), parameter.getDataSourceManager()).getConnection()) {
- for (Entry<String, Collection<String>> entry : createLogicTableSQLs.entrySet()) {
- for (String each : entry.getValue()) {
+ for (String createLogicalTableSQL : createLogicalTableSQLs) {
+ for (String each : Splitter.on(";").splitToList(createLogicalTableSQL).stream().filter(StringUtils::isNotBlank).collect(Collectors.toList())) {
executeTargetTableSQL(targetConnection, each);
}
- log.info("create target table '{}' success", entry.getKey());
}
} catch (final SQLException ex) {
throw new PipelineJobPrepareFailedException("prepare target tables failed.", ex);
}
}
- private Collection<ActualTableDefinition> getActualTableDefinitions(final PrepareTargetTablesParameter parameter) throws SQLException {
- Collection<ActualTableDefinition> result = new LinkedList<>();
- ShardingSpherePipelineDataSourceConfiguration sourceDataSourceConfig = (ShardingSpherePipelineDataSourceConfiguration) PipelineDataSourceConfigurationFactory.newInstance(
- parameter.getJobConfig().getSource().getType(), parameter.getJobConfig().getSource().getParameter());
- // TODO reuse PipelineDataSourceManager
- try (PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager()) {
- for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
- DataNode dataNode = each.getDataNodes().get(0);
- // Keep dataSource to reuse
- PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(sourceDataSourceConfig.getActualDataSourceConfig(dataNode.getDataSourceName()));
- try (Connection sourceConnection = dataSource.getConnection()) {
- String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- String actualTableName = dataNode.getTableName();
- String tableDefinition = queryTableDefinition(sourceConnection, schemaName, actualTableName);
- log.info("getActualTableDefinitions, schemaName={}, dataNode={}, tableDefinition={}", schemaName, dataNode, tableDefinition);
- String logicTableName = each.getLogicTableName();
- result.add(new ActualTableDefinition(logicTableName, actualTableName, tableDefinition));
- }
- }
- }
- return result;
- }
-
- private String queryTableDefinition(final Connection sourceConnection, final String schemaName, final String actualTableName) throws SQLException {
- String sql = String.format("SELECT * FROM pg_get_tabledef('%s.%s'::regclass::oid)", schemaName, actualTableName);
- log.info("queryTableDefinition, sql={}", sql);
- try (Statement statement = sourceConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
- if (!resultSet.next()) {
- throw new PipelineJobPrepareFailedException("table definition has no result, sql: " + sql);
- }
- return resultSet.getString(1);
- }
- }
-
- /**
- * Get create logic table SQLs.
- *
- * @param actualTableDefinitions actual table definitions. key is logic table name, value is actual table definition.
- * @return all SQLs. key is logic table name, value is collection of logic table SQLs.
- */
- private Map<String, Collection<String>> getCreateLogicTableSQLs(final Collection<ActualTableDefinition> actualTableDefinitions, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
- Map<String, Collection<String>> result = new HashMap<>();
- for (ActualTableDefinition each : actualTableDefinitions) {
- String schemaName = tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
- Collection<String> logicTableSQLs = splitTableDefinitionToSQLs(each).stream().map(sql -> {
- TableDefinitionSQLType sqlType = getTableDefinitionSQLType(sql);
- switch (sqlType) {
- // TODO replace constraint and index name
- case CREATE_TABLE:
- sql = addIfNotExistsForCreateTableSQL(sql);
- sql = appendSchemaName(sql, each.getActualTableName(), schemaName);
- sql = replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
- sql = skipCreateTableExtendSet(sql);
- return sql;
- case ALTER_TABLE:
- sql = appendSchemaName(sql, each.getActualTableName(), schemaName);
- return replaceActualTableNameToLogicTableName(sql, each.getActualTableName(), each.getLogicTableName());
- default:
- return "";
- }
- }).filter(sql -> !"".equals(sql)).collect(Collectors.toList());
- result.put(each.getLogicTableName(), logicTableSQLs);
+ private List<String> listCreateLogicalTableSQL(final PrepareTargetTablesParameter parameter) {
+ PipelineDDLGenerator generator = new PipelineDDLGenerator(PipelineContext.getContextManager());
+ List<String> result = new LinkedList<>();
+ for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
+ String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
+ result.add(generator.generateLogicDDLSQL(new OpenGaussDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
}
return result;
}
-
- private String appendSchemaName(final String createOrAlterTableSQL, final String actualTableName, final String schemaName) {
- int start = createOrAlterTableSQL.indexOf(actualTableName);
- if (start <= 0) {
- return createOrAlterTableSQL;
- }
- return new StringBuilder(createOrAlterTableSQL).insert(start, schemaName + ".").toString();
- }
-
- private String skipCreateTableExtendSet(final String createSQL) {
- String lowerCreateSQL = createSQL.toLowerCase();
- String[] search = {WITH_OF_TABLE_EXTEND, ")"};
- List<Integer> searchPos = new ArrayList<>(2);
- int startPos = 0;
- for (String each : search) {
- int curSearch = lowerCreateSQL.indexOf(each, startPos);
- if (curSearch <= 0) {
- break;
- }
- searchPos.add(curSearch);
- startPos = curSearch;
- }
- if (searchPos.size() != search.length) {
- return createSQL;
- }
- return createSQL.substring(0, searchPos.get(0)) + createSQL.substring(searchPos.get(1) + 1);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSource [...]
index 8800a400976..8168da6990e 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/prepare/datasource/PostgreSQLDataSourcePreparer.java
@@ -26,7 +26,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepare
import org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
import org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import java.sql.Connection;
@@ -41,8 +40,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePreparer {
- private static final DatabaseType DATABASE_TYPE = new PostgreSQLDatabaseType();
-
@Override
public void prepareTargetTables(final PrepareTargetTablesParameter parameter) {
List<String> createLogicalTableSQLs = listCreateLogicalTableSQL(parameter);
@@ -62,7 +59,7 @@ public final class PostgreSQLDataSourcePreparer extends AbstractDataSourcePrepar
List<String> result = new LinkedList<>();
for (JobDataNodeEntry each : parameter.getTablesFirstDataNodes().getEntries()) {
String schemaName = parameter.getTableNameSchemaNameMapping().getSchemaName(each.getLogicTableName());
- result.add(generator.generateLogicDDLSQL(DATABASE_TYPE, parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
+ result.add(generator.generateLogicDDLSQL(new PostgreSQLDatabaseType(), parameter.getJobConfig().getDatabaseName(), schemaName, each.getLogicTableName()));
}
return result;
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index d726d388b1c..af08177c3c6 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -225,7 +225,10 @@ public abstract class BaseITCase {
increaseTaskThread.join(60 * 1000L);
}
TimeUnit.SECONDS.sleep(2);
- String jobId = String.valueOf(getJdbcTemplate().queryForMap("SHOW SCALING LIST").get("id"));
+ List<Map<String, Object>> scalingListMap = getJdbcTemplate().queryForList("SHOW SCALING LIST");
+ assertThat(scalingListMap.size(), is(1));
+ Object jobId = scalingListMap.get(0).get("id");
+ log.info("jobId: {}", jobId);
Map<String, String> actualStatusMap = new HashMap<>(2, 1);
for (int i = 0; i < 100; i++) {
List<Map<String, Object>> showScalingStatusResMap = jdbcTemplate.queryForList(String.format("SHOW SCALING STATUS %s", jobId));
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
index 281fafeae6a..487879aca84 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/logback.xml
@@ -34,9 +34,7 @@
<logger name="org.apache.shardingsphere.mode.manager.cluster.coordinator.lock" level="DEBUG" additivity="false">
<appender-ref ref="console" />
</logger>
- <logger name="org.testcontainers.containers.ContainerState" level="OFF" />
<logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
- <logger name="com.atomikos.icatch.provider" level="OFF" />
<root>
<level value="WARN" />
<appender-ref ref="console" />
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
index b13e574bb64..200d454e788 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/logback-test.xml
@@ -25,6 +25,7 @@
<logger name="org.springframework.jdbc.core.JdbcTemplate" level="DEBUG" additivity="false">
<appender-ref ref="console" />
</logger>
+ <logger name="org.apache.shardingsphere.test.integration.framework.container.atomic.DockerITContainer" level="WARN" />
<logger name="com.zaxxer.hikari.pool.ProxyConnection" level="OFF" />
<root level="INFO">
<appender-ref ref="console" />