You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/05/13 12:31:33 UTC
[shardingsphere] branch master updated: Fix sonar issue of DataSourceImporter (#25643)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5c91008360e Fix sonar issue of DataSourceImporter (#25643)
5c91008360e is described below
commit 5c91008360e17e37aa1e0b10b1ccb2fe92fd92b2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat May 13 20:31:26 2023 +0800
Fix sonar issue of DataSourceImporter (#25643)
* Fix sonar issue of CalculationContext
* Fix sonar issue of DataSourceImporter
---
.../api/executor/AbstractLifecycleExecutor.java | 4 ++--
.../pipeline/core/importer/DataSourceImporter.java | 25 +++++++++++-----------
2 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index 89ad3b69224..461034577f3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -46,7 +46,7 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
private volatile long startTimeMillis;
@Override
- public void start() {
+ public final void start() {
running = true;
startTimeMillis = System.currentTimeMillis();
runBlocking();
@@ -78,7 +78,7 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
protected abstract void doStop() throws Exception;
- protected void cancelStatement(final Statement statement) throws SQLException {
+ protected final void cancelStatement(final Statement statement) throws SQLException {
if (null == statement || statement.isClosed()) {
return;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 163b4fd39e8..5fc458ea99b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -53,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -77,11 +78,11 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- private volatile Statement batchInsertStatement;
+ private final AtomicReference<Statement> batchInsertStatement = new AtomicReference<>();
- private volatile Statement updateStatement;
+ private final AtomicReference<Statement> updateStatement = new AtomicReference<>();
- private volatile Statement batchDeleteStatement;
+ private final AtomicReference<Statement> batchDeleteStatement = new AtomicReference<>();
public DataSourceImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
final PipelineJobProgressListener jobProgressListener) {
@@ -210,7 +211,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
DataRecord dataRecord = dataRecords.get(0);
String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) {
- batchInsertStatement = preparedStatement;
+ batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < each.getColumnCount(); i++) {
@@ -220,7 +221,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
}
preparedStatement.executeBatch();
} finally {
- batchInsertStatement = null;
+ batchInsertStatement.set(null);
}
}
@@ -240,7 +241,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record);
String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns);
try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) {
- updateStatement = preparedStatement;
+ updateStatement.set(preparedStatement);
for (int i = 0; i < updatedColumns.size(); i++) {
preparedStatement.setObject(i + 1, updatedColumns.get(i).getValue());
}
@@ -259,7 +260,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", updateCount, updateSql, updatedColumns, conditionColumns);
}
} finally {
- updateStatement = null;
+ updateStatement.set(null);
}
}
@@ -268,7 +269,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName()));
String deleteSQL = pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns);
try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
- batchDeleteStatement = preparedStatement;
+ batchDeleteStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < conditionColumns.size(); i++) {
@@ -285,7 +286,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
log.warn("batchDelete failed, counts={}, sql={}, conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
}
} finally {
- batchDeleteStatement = null;
+ batchDeleteStatement.set(null);
}
}
@@ -309,8 +310,8 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
@Override
protected void doStop() throws SQLException {
- cancelStatement(batchInsertStatement);
- cancelStatement(updateStatement);
- cancelStatement(batchDeleteStatement);
+ cancelStatement(batchInsertStatement.get());
+ cancelStatement(updateStatement.get());
+ cancelStatement(batchDeleteStatement.get());
}
}