You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by to...@apache.org on 2022/10/03 03:45:34 UTC
[shardingsphere] branch master updated: Remove unnecessary lazy loading for InventoryDumper (#21323)
This is an automated email from the ASF dual-hosted git repository.
totalo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cc88e9ab0f5 Remove unnecessary lazy loading for InventoryDumper (#21323)
cc88e9ab0f5 is described below
commit cc88e9ab0f5d2e929251e9a6eb200ddbb19d1ce9
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Mon Oct 3 11:45:26 2022 +0800
Remove unnecessary lazy loading for InventoryDumper (#21323)
---
.../core/ingest/dumper/InventoryDumper.java | 22 ++++++----------------
1 file changed, 6 insertions(+), 16 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 5d41113fb64..db08bca5125 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -19,10 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
@@ -75,7 +72,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
private final ColumnValueReader columnValueReader;
- private final LazyInitializer<PipelineTableMetaData> metaDataLoader;
+ private final PipelineTableMetaDataLoader metaDataLoader;
public InventoryDumper(final InventoryDumperConfiguration dumperConfig, final PipelineChannel channel, final DataSource dataSource, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
@@ -85,13 +82,7 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
this.dataSource = dataSource;
sqlBuilder = PipelineSQLBuilderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
columnValueReader = ColumnValueReaderFactory.getInstance(dumperConfig.getDataSourceConfig().getDatabaseType().getType());
- this.metaDataLoader = new LazyInitializer<PipelineTableMetaData>() {
-
- @Override
- protected PipelineTableMetaData initialize() {
- return metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
- }
- };
+ this.metaDataLoader = metaDataLoader;
}
@Override
@@ -105,11 +96,12 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
log.info("Ignored because of already finished.");
return;
}
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName())), dumperConfig.getActualTableName());
Object beginUniqueKeyValue = ((PrimaryKeyPosition<?>) position).getBeginValue();
+ int round = 1;
try (Connection connection = dataSource.getConnection()) {
- int round = 1;
Optional<Object> maxUniqueKeyValue;
- while ((maxUniqueKeyValue = dump(connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
+ while ((maxUniqueKeyValue = dump(tableMetaData, connection, 1 == round ? firstSQL : laterSQL, beginUniqueKeyValue, round++)).isPresent()) {
beginUniqueKeyValue = maxUniqueKeyValue.get();
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
@@ -126,13 +118,11 @@ public final class InventoryDumper extends AbstractLifecycleExecutor implements
}
}
- @SneakyThrows(ConcurrentException.class)
- private Optional<Object> dump(final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
+ private Optional<Object> dump(final PipelineTableMetaData tableMetaData, final Connection connection, final String sql, final Object beginUniqueKeyValue, final int round) throws SQLException {
if (null != dumperConfig.getRateLimitAlgorithm()) {
dumperConfig.getRateLimitAlgorithm().intercept(JobOperationType.SELECT, 1);
}
int batchSize = dumperConfig.getBatchSize();
- PipelineTableMetaData tableMetaData = metaDataLoader.get();
try (PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
setParameters(preparedStatement, batchSize, beginUniqueKeyValue);
try (ResultSet resultSet = preparedStatement.executeQuery()) {