You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/10/02 13:43:25 UTC
[shardingsphere] branch master updated: Refactor AbstractInventoryDumper (#21319)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 627ddecf519 Refactor AbstractInventoryDumper (#21319)
627ddecf519 is described below
commit 627ddecf5191936007291aad9e446ff30b06952a
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sun Oct 2 21:43:17 2022 +0800
Refactor AbstractInventoryDumper (#21319)
---
.../loader/PipelineTableMetaDataLoader.java | 8 +-
.../ingest/dumper/AbstractInventoryDumper.java | 149 +++++++++------------
.../core/ingest/dumper/DefaultInventoryDumper.java | 9 --
.../mysql/ingest/MySQLInventoryDumper.java | 13 +-
.../mysql/ingest/MySQLInventoryDumperTest.java | 3 +-
.../ingest/PostgreSQLInventoryDumper.java | 8 +-
.../ingest/PostgreSQLJdbcDumperTest.java | 3 +-
7 files changed, 79 insertions(+), 114 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
index 5fff7bb307c..0e9ab49ffa7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
@@ -20,16 +20,16 @@ package org.apache.shardingsphere.data.pipeline.api.metadata.loader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
/**
- * Pipeline table metadata loader.
+ * Pipeline table meta data loader.
*/
public interface PipelineTableMetaDataLoader {
/**
- * Get table metadata, load if it does not exist.
+ * Get table meta data, load if meta data absent.
*
- * @param schemaName schema name. nullable
+ * @param schemaName schema name, can be nullable
* @param tableName dedicated table name, not table name pattern
- * @return table metadata
+ * @return got table meta data
*/
PipelineTableMetaData getTableMetaData(String schemaName, String tableName);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index d14b3ccfde9..2d0656e53fa 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -36,7 +36,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.PrimaryKeyPos
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -47,8 +46,8 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestExcep
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import javax.sql.DataSource;
@@ -70,146 +69,128 @@ public abstract class AbstractInventoryDumper extends AbstractLifecycleExecutor
private final PipelineChannel channel;
- private final PipelineSQLBuilder pipelineSQLBuilder;
-
- private final ColumnValueReader columnValueReader;
-
private final DataSource dataSource;
- private final int batchSize;
+ private final PipelineSQLBuilder sqlBuilder;
- private final JobRateLimitAlgorithm rateLimitAlgorithm;
+ private final ColumnValueReader columnValueReader;
- private final LazyInitializer<PipelineTableMetaData> tableMetaDataLazyInitializer;
+ private final LazyInitializer<PipelineTableMetaData> metaDataLoader;
- protected AbstractInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
- final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
- if (!StandardPipelineDataSourceConfiguration.class.equals(inventoryDumperConfig.getDataSourceConfig().getClass())) {
- throw new UnsupportedSQLOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration");
- }
- this.dumperConfig = inventoryDumperConfig;
+ protected AbstractInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
+ () -> new UnsupportedSQLOperationException("AbstractInventoryDumper only support StandardPipelineDataSourceConfiguration"));
+ this.dumperConfig = dumperConfig;
this.channel = channel;
- pipelineSQLBuilder = PipelineSQLBuilderFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType());
- columnValueReader = ColumnValueReaderFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType());
this.dataSource = dataSource;
- batchSize = inventoryDumperConfig.getBatchSize();
- rateLimitAlgorithm = inventoryDumperConfig.getRateLimitAlgorithm();
- tableMetaDataLazyInitializer = new LazyInitializer<PipelineTableMetaData>() {
+ sqlBuilder = PipelineSQLBuilderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+ columnValueReader = ColumnValueReaderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
+ this.metaDataLoader = new LazyInitializer<PipelineTableMetaData>() {
@Override
protected PipelineTableMetaData initialize() {
- String schemaName = inventoryDumperConfig.getSchemaName(new LogicTableName(inventoryDumperConfig.getLogicTableName()));
- return metaDataLoader.getTableMetaData(schemaName, inventoryDumperConfig.getActualTableName());
+ return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
}
};
}
@Override
protected void runBlocking() {
- dump();
- }
-
- private void dump() {
String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
- int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
- String firstSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, true);
- String laterSQL = pipelineSQLBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), uniqueKeyDataType, false);
+ String firstSQL = sqlBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), true);
+ String laterSQL = sqlBuilder.buildInventoryDumpSQL(schemaName, dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), dumperConfig.getUniqueKeyDataType(), false);
IngestPosition<?> position = dumperConfig.getPosition();
- log.info("inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}", uniqueKeyDataType, firstSQL, laterSQL, position);
+ log.info("Inventory dump, uniqueKeyDataType={}, firstSQL={}, laterSQL={}, position={}.", dumperConfig.getUniqueKeyDataType(), firstSQL, laterSQL, position);
if (position instanceof FinishedPosition) {
- log.info("It is already finished, ignore");
+ log.info("Ignored because of already finished.");
return;
}
- Object startUniqueKeyValue = getPositionBeginValue(position);
- try (Connection conn = dataSource.getConnection()) {
+ Object beginUniqueKeyValue = ((PrimaryKeyPosition<?>) position).getBeginValue();
+ try (Connection connection = dataSource.getConnection()) {
int round = 1;
Optional<Object> maxUniqueKeyValue;
- while ((maxUniqueKeyValue = dump0(conn, 1 == round ? firstSQL : laterSQL, dumperConfig.getUniqueKey(), uniqueKeyDataType, startUniqueKeyValue, round++)).isPresent()) {
- startUniqueKeyValue = maxUniqueKeyValue.get();
+ while ((maxUniqueKeyValue = dump(connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
+ beginUniqueKeyValue = maxUniqueKeyValue.get();
if (!isRunning()) {
- log.info("inventory dump, running is false, break");
+ log.info("Broke because of inventory dump is not running.");
break;
}
}
- log.info("inventory dump done, round={}, maxUniqueKeyValue={}", round, maxUniqueKeyValue);
+ log.info("Inventory dump done, round={}, maxUniqueKeyValue={}.", round, maxUniqueKeyValue);
} catch (final SQLException ex) {
- log.error("inventory dump, ex caught, msg={}", ex.getMessage());
+ log.error("Inventory dump, ex caught, msg={}.", ex.getMessage());
throw new IngestException(ex);
} finally {
- log.info("inventory dump, before put FinishedRecord");
- pushRecord(new FinishedRecord(new FinishedPosition()));
+ log.info("Inventory dump, before put FinishedRecord.");
+ channel.pushRecord(new FinishedRecord(new FinishedPosition()));
}
}
@SneakyThrows(ConcurrentException.class)
- private PipelineTableMetaData getTableMetaData() {
- return tableMetaDataLazyInitializer.get();
- }
-
- private Optional<Object> dump0(final Connection conn, final String sql, final String uniqueKey, final int uniqueKeyDataType, final Object startUniqueKeyValue,
- final int round) throws SQLException {
- if (null != rateLimitAlgorithm) {
- rateLimitAlgorithm.intercept(JobOperationType.SELECT, 1);
+ private Optional<Object> dump(final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
+ if (null != dumperConfig.getRateLimitAlgorithm()) {
+ dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
}
- PipelineTableMetaData tableMetaData = getTableMetaData();
- try (PreparedStatement preparedStatement = createPreparedStatement(conn, sql)) {
- preparedStatement.setFetchSize(batchSize);
- if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
- preparedStatement.setObject(1, startUniqueKeyValue);
- preparedStatement.setObject(2, getPositionEndValue(dumperConfig.getPosition()));
- preparedStatement.setInt(3, batchSize);
- } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
- preparedStatement.setObject(1, startUniqueKeyValue);
- preparedStatement.setInt(2, batchSize);
- } else {
- throw new UnsupportedPipelineJobUniqueKeyDataTypeException(uniqueKeyDataType);
- }
+ int batchSize = dumperConfig.getBatchSize();
+ PipelineTableMetaData tableMetaData = metaDataLoader.get();
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+ setDialectParameters(preparedStatement);
+ setPreparedStatementParameters(preparedStatement, batchSize, beginUniqueKeyValue);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int rowCount = 0;
Object maxUniqueKeyValue = null;
- String logicTableName = dumperConfig.getLogicTableName();
while (resultSet.next()) {
- DataRecord record = new DataRecord(newPosition(resultSet), resultSetMetaData.getColumnCount());
- record.setType(IngestDataChangeType.INSERT);
- record.setTableName(logicTableName);
- maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(uniqueKey).getOrdinalPosition());
- for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
- boolean isUniqueKey = tableMetaData.getColumnMetaData(i).isUniqueKey();
- record.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, isUniqueKey));
- }
- pushRecord(record);
+ channel.pushRecord(loadDataRecord(resultSet, resultSetMetaData, tableMetaData));
+ maxUniqueKeyValue = columnValueReader.readValue(resultSet, resultSetMetaData, tableMetaData.getColumnMetaData(dumperConfig.getUniqueKey()).getOrdinalPosition());
rowCount++;
if (!isRunning()) {
- log.info("dump, running is false, break");
+ log.info("Broke because of inventory dump is not running.");
break;
}
}
if (0 == round % 50) {
- log.info("dump, round={}, rowCount={}, maxUniqueKeyValue={}", round, rowCount, maxUniqueKeyValue);
+ log.info("Dumping, round={}, rowCount={}, maxUniqueKeyValue={}.", round, rowCount, maxUniqueKeyValue);
}
return Optional.ofNullable(maxUniqueKeyValue);
}
}
}
- private Object getPositionBeginValue(final IngestPosition<?> position) {
- return ((PrimaryKeyPosition<?>) position).getBeginValue();
+ private void setPreparedStatementParameters(final PreparedStatement preparedStatement, final int batchSize, final Object beginUniqueKeyValue) throws SQLException {
+ preparedStatement.setFetchSize(batchSize);
+ if (PipelineJdbcUtils.isIntegerColumn(dumperConfig.getUniqueKeyDataType())) {
+ preparedStatement.setObject(1, beginUniqueKeyValue);
+ preparedStatement.setObject(2, ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+ preparedStatement.setInt(3, batchSize);
+ return;
+ }
+ if (PipelineJdbcUtils.isStringColumn(dumperConfig.getUniqueKeyDataType())) {
+ preparedStatement.setObject(1, beginUniqueKeyValue);
+ preparedStatement.setInt(2, batchSize);
+ return;
+ }
+ throw new UnsupportedPipelineJobUniqueKeyDataTypeException(dumperConfig.getUniqueKeyDataType());
}
- private Object getPositionEndValue(final IngestPosition<?> position) {
- return ((PrimaryKeyPosition<?>) position).getEndValue();
+ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) throws SQLException {
+ int columnCount = resultSetMetaData.getColumnCount();
+ DataRecord result = new DataRecord(newPosition(resultSet), columnCount);
+ result.setType(IngestDataChangeType.INSERT);
+ result.setTableName(dumperConfig.getLogicTableName());
+ for (int i = 1; i <= columnCount; i++) {
+ result.addColumn(new Column(resultSetMetaData.getColumnName(i), columnValueReader.readValue(resultSet, resultSetMetaData, i), true, tableMetaData.getColumnMetaData(i).isUniqueKey()));
+ }
+ return result;
}
- private IngestPosition<?> newPosition(final ResultSet rs) throws SQLException {
- return null == dumperConfig.getUniqueKey() ? new PlaceholderPosition()
- : PrimaryKeyPositionFactory.newInstance(rs.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
+ private IngestPosition<?> newPosition(final ResultSet resultSet) throws SQLException {
+ return null == dumperConfig.getUniqueKey()
+ ? new PlaceholderPosition()
+ : PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperConfig.getUniqueKey()), ((PrimaryKeyPosition<?>) dumperConfig.getPosition()).getEndValue());
}
- protected abstract PreparedStatement createPreparedStatement(Connection connection, String sql) throws SQLException;
-
- private void pushRecord(final Record record) {
- channel.pushRecord(record);
+ protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
index 596f740a609..df1b8454175 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
@@ -22,10 +22,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import javax.sql.DataSource;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
/**
* Default inventory dumper.
@@ -36,9 +32,4 @@ public final class DefaultInventoryDumper extends AbstractInventoryDumper {
final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
}
-
- @Override
- protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
- return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 6fd46b68b3a..307653b2920 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -23,9 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
import javax.sql.DataSource;
-import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
/**
@@ -33,15 +31,12 @@ import java.sql.SQLException;
*/
public final class MySQLInventoryDumper extends AbstractInventoryDumper {
- public MySQLInventoryDumper(final InventoryDumperConfiguration inventoryDumperConfig, final PipelineChannel channel,
- final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
- super(inventoryDumperConfig, channel, dataSource, metaDataLoader);
+ public MySQLInventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
+ super(dumperConfig, channel, dataSource, metaDataLoader);
}
@Override
- protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
- PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- result.setFetchSize(Integer.MIN_VALUE);
- return result;
+ protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
+ preparedStatement.setFetchSize(Integer.MIN_VALUE);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index d8aaf019c89..5f861f0f51c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -80,7 +80,8 @@ public final class MySQLInventoryDumperTest {
public void assertCreatePreparedStatement() throws SQLException {
Connection connection = mock(Connection.class);
when(connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).thenReturn(mock(PreparedStatement.class));
- PreparedStatement preparedStatement = mysqlJdbcDumper.createPreparedStatement(connection, "");
+ PreparedStatement preparedStatement = connection.prepareStatement("", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ mysqlJdbcDumper.setDialectParameters(preparedStatement);
verify(preparedStatement).setFetchSize(Integer.MIN_VALUE);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 3cbf9a158b5..143fdc5172c 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -23,9 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTable
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
import javax.sql.DataSource;
-import java.sql.Connection;
import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.sql.SQLException;
/**
@@ -39,9 +37,7 @@ public final class PostgreSQLInventoryDumper extends AbstractInventoryDumper {
}
@Override
- protected PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
- PreparedStatement result = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- result.setFetchSize(1);
- return result;
+ protected void setDialectParameters(final PreparedStatement preparedStatement) throws SQLException {
+ preparedStatement.setFetchSize(1);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index 36cd7d478ae..c04d4915aa2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -77,7 +77,8 @@ public final class PostgreSQLJdbcDumperTest {
public void assertCreatePreparedStatement() throws SQLException {
try (
Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement = jdbcDumper.createPreparedStatement(connection, "SELECT * FROM t_order")) {
+ PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM t_order")) {
+ jdbcDumper.setDialectParameters(preparedStatement);
assertThat(preparedStatement.getFetchSize(), is(1));
}
}