You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/02 13:43:25 UTC

[shardingsphere] branch master updated: Refactor AbstractInventoryDumper (#21319)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 627ddecf519 Refactor AbstractInventoryDumper (#21319)
627ddecf519 is described below

commit 627ddecf5191936007291aad9e446ff30b06952a
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Oct 2 21:43:17 2022 +0800

    Refactor AbstractInventoryDumper (#21319)
---
 .../loader/PipelineTableMetaDataLoader.java        |   8 +-
 .../ingest/dumper/AbstractInventoryDumper.java     | 149 +++++++++------------
 .../core/ingest/dumper/DefaultInventoryDumper.java |   9 --
 .../mysql/ingest/MySQLInventoryDumper.java         |  13 +-
 .../mysql/ingest/MySQLInventoryDumperTest.java     |   3 +-
 .../ingest/PostgreSQLInventoryDumper.java          |   8 +-
 .../ingest/PostgreSQLJdbcDumperTest.java           |   3 +-
 7 files changed, 79 insertions(+), 114 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
index 5fff7bb307c..0e9ab49ffa7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
@@ -20,16 +20,16 @@ package org.apache.shardingsphere.data.pipeline.api.metadata.loader;
 import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 
 /**
- * Pipeline table metadata loader.
+ * Pipeline table meta data loader.
  */
 public interface PipelineTableMetaDataLoader {
     
     /**
-     * Get table metadata, load if it does not exist.
+     * Get table meta data, load if meta data absent.
      *
-     * @param schemaName schema name. nullable
+     * @param schemaName schema name, can be nullable
      * @param tableName dedicated table name, not table name pattern
-     * @return table metadata
+     * @return got table meta data
      */
     PipelineTableMetaData getTableMetaData(String schemaName, String tableName);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index d14b3ccfde9..2d0656e53fa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -47,8 +46,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import javax.sql.DataSource;
@@ -70,146 +69,128 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
     
     private final PipelineChannel channel;
     
-    private final PipelineSQLBuilder pipelineSQLBuilder;
-    
-    private final ColumnValueReader columnValueReader;
-    
     private final DataSource dataSource;
     
-    private final int batchSize;
+    private final PipelineSQLBuilder sqlBuilder;
     
-    private final JobRateLimitAlgorithm rateLimitAlgorithm;
+    private final ColumnValueReader columnValueReader;
     
-    private final LazyInitializer<PipelineTableMetaData> tableMetaDataLazyInitializer;
+    private final LazyInitializer<PipelineTableMetaData> metaDataLoader;
     
-    protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
-                                      final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
-        if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) {
-            throw new UnsupportedSQLOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration");
-        }
-        this.dumperConfig = inventoryDumperConfig;
+    protected AbstractInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+        ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+                () -> new UnsupportedSQLOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration"));
+        this.dumperConfig = dumperConfig;
         this.channel = channel;
-        pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType());
-        columnValueReader = ColumnValueReaderFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType());
         this.dataSource = dataSource;
-        batchSize = inventoryDumperConfig.getBatchSize();
-        rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
-        tableMetaDataLazyInitializer = new LazyInitializer<PipelineTableMetaData>() {
+        sqlBuilder = PipelineSQLBuilderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+        columnValueReader = ColumnValueReaderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+        this.metaDataLoader = new LazyInitializer<PipelineTableMetaData>() {
             
             @Override
             protected PipelineTableMetaData initialize() {
-                String schemaName = inventoryDumperConfig.getSchemaName(new LogicTableName(inventoryDumperConfig.getLogicTableName()));
-                return metaDataLoader.getTableMetaData(schemaName, inventoryDumperConfig.getActualTableName());
+                return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
             }
         };
     }
     
     @Override
     protected void runBlocking() {
-        dump();
-    }
-    
-    private void dump() {
         String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
-        int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
-        String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, true);
-        String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, false);
+        String firstSQL = sqlBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), true);
+        String laterSQL = sqlBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), false);
         IngestPosition<?> position = dumperConfig.getPosition();
-        log.info("inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}", uniqueKeyDataType, firstSQL, laterSQL, position);
+        log.info("Inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}.", dumperConfig.getUniqueKeyDataType(), firstSQL, laterSQL, position);
         if (position instanceof FinishedPosition) {
-            log.info("It is already finished, ignore");
+            log.info("Ignored because of already finished.");
             return;
         }
-        Object startUniqueKeyValue = getPositionBeginValue(position);
-        try (Connection conn = dataSource.getConnection()) {
+        Object beginUniqueKeyValue = ((PrimaryKeyPosition<?>) position).getBeginValue();
+        try (Connection connection = dataSource.getConnection()) {
             int round = 1;
             Optional<Object> maxUniqueKeyValue;
-            while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL : laterSQL, dumperConfig.getUniqueKey(), uniqueKeyDataType, startUniqueKeyValue, round++)).isPresent()) {
-                startUniqueKeyValue = maxUniqueKeyValue.get();
+            while ((maxUniqueKeyValue = dump(connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
+                beginUniqueKeyValue = maxUniqueKeyValue.get();
                 if (!isRunning()) {
-                    log.info("inventory dump, running is false, break");
+                    log.info("Broke because of inventory dump is not running.");
                     break;
                 }
             }
-            log.info("inventory dump done, round={}, maxUniqueKeyValue={}", round, maxUniqueKeyValue);
+            log.info("Inventory dump done, round={}, maxUniqueKeyValue={}.", round, maxUniqueKeyValue);
         } catch (final SQLException ex) {
-            log.error("inventory dump, ex caught, msg={}", ex.getMessage());
+            log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
             throw new IngestException(ex);
         } finally {
-            log.info("inventory dump, before put FinishedRecord");
-            pushRecord(new FinishedRecord(new FinishedPosition()));
+            log.info("Inventory dump, before put FinishedRecord.");
+            channel.pushRecord(new FinishedRecord(new FinishedPosition()));
         }
     }
     
     @SneakyThrows(ConcurrentException.class)
-    private PipelineTableMetaData getTableMetaData() {
-        return tableMetaDataLazyInitializer.get();
-    }
-    
-    private Optional<Object> dump0(final Connection conn, final String sql, final String uniqueKey, final int uniqueKeyDataType, final Object startUniqueKeyValue,
-                                   final int round) throws SQLException {
-        if (null != rateLimitAlgorithm) {
-            rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+    private Optional<Object> dump(final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
+        if (null != dumperConfig.getRateLimitAlgorithm()) {
+            dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
         }
-        PipelineTableMetaData tableMetaData = getTableMetaData();
-        try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) {
-            preparedStatement.setFetchSize(batchSize);
-            if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
-                preparedStatement.setObject(1, startUniqueKeyValue);
-                preparedStatement.setObject(2, getPositionEndValue(dumperConfig.getPosition()));
-                preparedStatement.setInt(3, batchSize);
-            } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
-                preparedStatement.setObject(1, startUniqueKeyValue);
-                preparedStatement.setInt(2, batchSize);
-            } else {
-                throw new UnsupportedPipelineJobUniqueKeyDataTypeException(uniqueKeyDataType);
-            }
+        int batchSize = dumperConfig.getBatchSize();
+        PipelineTableMetaData tableMetaData = metaDataLoader.get();
+        try (PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+            setDialectParameters(preparedStatement);
+            setPreparedStatementParameters(preparedStatement, batchSize, beginUniqueKeyValue);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
                 int rowCount = 0;
                 Object maxUniqueKeyValue = null;
-                String logicTableName = dumperConfig.getLogicTableName();
                 while (resultSet.next()) {
-                    DataRecord record = new DataRecord(newPosition(resultSet), resultSetMetaData.getColumnCount());
-                    record.setType(IngestDataChangeType.INSERT);
-                    record.setTableName(logicTableName);
-                    maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(uniqueKey).getOrdinalPosition());
-                    for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
-                        boolean isUniqueKey = tableMetaData.getColumnMetaData(i).isUniqueKey();
-                        record.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, isUniqueKey));
-                    }
-                    pushRecord(record);
+                    channel.pushRecord(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
+                    maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(dumperConfig.getUniqueKey()).getOrdinalPosition());
                     rowCount++;
                     if (!isRunning()) {
-                        log.info("dump, running is false, break");
+                        log.info("Broke because of inventory dump is not running.");
                         break;
                     }
                 }
                 if (0 == round % 50) {
-                    log.info("dump, round={}, rowCount={}, maxUniqueKeyValue={}", round, rowCount, maxUniqueKeyValue);
+                    log.info("Dumping, round={}, rowCount={}, maxUniqueKeyValue={}.", round, rowCount, maxUniqueKeyValue);
                 }
                 return Optional.ofNullable(maxUniqueKeyValue);
             }
         }
     }
     
-    private Object getPositionBeginValue(final IngestPosition<?> position) {
-        return ((PrimaryKeyPosition<?>) position).getBeginValue();
+    private void setPreparedStatementParameters(final PreparedStatement preparedStatement, final int batchSize, final Object beginUniqueKeyValue) throws SQLException {
+        preparedStatement.setFetchSize(batchSize);
+        if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
+            preparedStatement.setObject(1, beginUniqueKeyValue);
+            preparedStatement.setObject(2, ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+            preparedStatement.setInt(3, batchSize);
+            return;
+        }
+        if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
+            preparedStatement.setObject(1, beginUniqueKeyValue);
+            preparedStatement.setInt(2, batchSize);
+            return;
+        }
+        throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
     }
     
-    private Object getPositionEndValue(final IngestPosition<?> position) {
-        return ((PrimaryKeyPosition<?>) position).getEndValue();
+    private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
+        int columnCount = resultSetMetaData.getColumnCount();
+        DataRecord result = new DataRecord(newPosition(resultSet), columnCount);
+        result.setType(IngestDataChangeType.INSERT);
+        result.setTableName(dumperConfig.getLogicTableName());
+        for (int i = 1; i <= columnCount; i++) {
+            result.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(i).isUniqueKey()));
+        }
+        return result;
     }
     
-    private IngestPosition<?> newPosition(final ResultSet rs) throws SQLException {
-        return null == dumperConfig.getUniqueKey() ? new PlaceholderPosition()
-                : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+    private IngestPosition<?> newPosition(final ResultSet resultSet) throws SQLException {
+        return null == dumperConfig.getUniqueKey()
+                ? new PlaceholderPosition()
+                : PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
     }
     
-    protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
-    
-    private void pushRecord(final Record record) {
-        channel.pushRecord(record);
+    protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
index 596f740a609..df1b8454175 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
@@ -22,10 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 
 /**
  * Default inventory dumper.
@@ -36,9 +32,4 @@ public final class DefaultInventoryDumper extends AbstractInventoryDumper {
                                   final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
         super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
     }
-    
-    @Override
-    protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
-        return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 6fd46b68b3a..307653b2920 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -23,9 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
 
 import javax.sql.DataSource;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
@@ -33,15 +31,12 @@ import java.sql.SQLException;
  */
 public final class MySQLInventoryDumper extends AbstractInventoryDumper {
     
-    public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
-                                final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
-        super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
+    public MySQLInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+        super(dumperConfig, channel, dataSource, metaDataLoader);
     }
     
     @Override
-    protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
-        PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-        result.setFetchSize(Integer.MIN_VALUE);
-        return result;
+    protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
+        preparedStatement.setFetchSize(Integer.MIN_VALUE);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index d8aaf019c89..5f861f0f51c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -80,7 +80,8 @@ public final class MySQLInventoryDumperTest {
     public void assertCreatePreparedStatement() throws SQLException {
         Connection connection = mock(Connection.class);
         when(connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).thenReturn(mock(PreparedStatement.class));
-        PreparedStatement preparedStatement = mysqlJdbcDumper.createPreparedStatement(connection, "");
+        PreparedStatement preparedStatement = connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        mysqlJdbcDumper.setDialectParameters(preparedStatement);
         verify(preparedStatement).setFetchSize(Integer.MIN_VALUE);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 3cbf9a158b5..143fdc5172c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -23,9 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
 
 import javax.sql.DataSource;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.SQLException;
 
 /**
@@ -39,9 +37,7 @@ public final class PostgreSQLInventoryDumper extends AbstractInventoryDumper {
     }
     
     @Override
-    protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
-        PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
-        result.setFetchSize(1);
-        return result;
+    protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
+        preparedStatement.setFetchSize(1);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 36cd7d478ae..c04d4915aa2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -77,7 +77,8 @@ public final class PostgreSQLJdbcDumperTest {
     public void assertCreatePreparedStatement() throws SQLException {
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = jdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+                PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM t_order")) {
+            jdbcDumper.setDialectParameters(preparedStatement);
             assertThat(preparedStatement.getFetchSize(), is(1));
         }
     }