You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/05/13 12:31:33 UTC

[shardingsphere] branch master updated: Fix sonar issue of DataSourceImporter (#25643)

This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c91008360e Fix sonar issue of DataSourceImporter (#25643)
5c91008360e is described below

commit 5c91008360e17e37aa1e0b10b1ccb2fe92fd92b2
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat May 13 20:31:26 2023 +0800

    Fix sonar issue of DataSourceImporter (#25643)
    
    * Fix sonar issue of CalculationContext
    
    * Fix sonar issue of DataSourceImporter
---
 .../api/executor/AbstractLifecycleExecutor.java    |  4 ++--
 .../pipeline/core/importer/DataSourceImporter.java | 25 +++++++++++-----------
 2 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
index 89ad3b69224..461034577f3 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/executor/AbstractLifecycleExecutor.java
@@ -46,7 +46,7 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     private volatile long startTimeMillis;
     
     @Override
-    public void start() {
+    public final void start() {
         running = true;
         startTimeMillis = System.currentTimeMillis();
         runBlocking();
@@ -78,7 +78,7 @@ public abstract class AbstractLifecycleExecutor implements LifecycleExecutor {
     
     protected abstract void doStop() throws Exception;
     
-    protected void cancelStatement(final Statement statement) throws SQLException {
+    protected final void cancelStatement(final Statement statement) throws SQLException {
         if (null == statement || statement.isClosed()) {
             return;
         }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 163b4fd39e8..5fc458ea99b 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -53,6 +53,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -77,11 +78,11 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     private final JobRateLimitAlgorithm rateLimitAlgorithm;
     
-    private volatile Statement batchInsertStatement;
+    private final AtomicReference<Statement> batchInsertStatement = new AtomicReference<>();
     
-    private volatile Statement updateStatement;
+    private final AtomicReference<Statement> updateStatement = new AtomicReference<>();
     
-    private volatile Statement batchDeleteStatement;
+    private final AtomicReference<Statement> batchDeleteStatement = new AtomicReference<>();
     
     public DataSourceImporter(final ImporterConfiguration importerConfig, final ImporterConnector importerConnector, final PipelineChannel channel,
                               final PipelineJobProgressListener jobProgressListener) {
@@ -210,7 +211,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         DataRecord dataRecord = dataRecords.get(0);
         String insertSql = pipelineSqlBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord);
         try (PreparedStatement preparedStatement = connection.prepareStatement(insertSql)) {
-            batchInsertStatement = preparedStatement;
+            batchInsertStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 for (int i = 0; i < each.getColumnCount(); i++) {
@@ -220,7 +221,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
             }
             preparedStatement.executeBatch();
         } finally {
-            batchInsertStatement = null;
+            batchInsertStatement.set(null);
         }
     }
     
@@ -240,7 +241,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         List<Column> updatedColumns = pipelineSqlBuilder.extractUpdatedColumns(record);
         String updateSql = pipelineSqlBuilder.buildUpdateSQL(getSchemaName(record.getTableName()), record, conditionColumns);
         try (PreparedStatement preparedStatement = connection.prepareStatement(updateSql)) {
-            updateStatement = preparedStatement;
+            updateStatement.set(preparedStatement);
             for (int i = 0; i < updatedColumns.size(); i++) {
                 preparedStatement.setObject(i + 1, updatedColumns.get(i).getValue());
             }
@@ -259,7 +260,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
                 log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", updateCount, updateSql, updatedColumns, conditionColumns);
             }
         } finally {
-            updateStatement = null;
+            updateStatement.set(null);
         }
     }
     
@@ -268,7 +269,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
         List<Column> conditionColumns = RecordUtils.extractConditionColumns(dataRecord, importerConfig.getShardingColumns(dataRecord.getTableName()));
         String deleteSQL = pipelineSqlBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, conditionColumns);
         try (PreparedStatement preparedStatement = connection.prepareStatement(deleteSQL)) {
-            batchDeleteStatement = preparedStatement;
+            batchDeleteStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
                 for (int i = 0; i < conditionColumns.size(); i++) {
@@ -285,7 +286,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
                 log.warn("batchDelete failed, counts={}, sql={}, conditionColumns={}", Arrays.toString(counts), deleteSQL, conditionColumns);
             }
         } finally {
-            batchDeleteStatement = null;
+            batchDeleteStatement.set(null);
         }
     }
     
@@ -309,8 +310,8 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
     
     @Override
     protected void doStop() throws SQLException {
-        cancelStatement(batchInsertStatement);
-        cancelStatement(updateStatement);
-        cancelStatement(batchDeleteStatement);
+        cancelStatement(batchInsertStatement.get());
+        cancelStatement(updateStatement.get());
+        cancelStatement(batchDeleteStatement.get());
     }
 }