You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by su...@apache.org on 2023/05/13 04:21:42 UTC
[shardingsphere] branch master updated: Fix sonar issue of IncrementalTaskProgress (#25637)
This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f8ae1216ad4 Fix sonar issue of IncrementalTaskProgress (#25637)
f8ae1216ad4 is described below
commit f8ae1216ad4d9ae9a0a4821568003b99878acd33
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Sat May 13 12:21:33 2023 +0800
Fix sonar issue of IncrementalTaskProgress (#25637)
* Fix sonar issue of IncrementalTaskProgress
---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 3 +-
.../api/task/progress/IncrementalTaskProgress.java | 51 +++++++++++++++++++---
...YamlJobItemIncrementalTasksProgressSwapper.java | 3 +-
.../data/pipeline/core/task/IncrementalTask.java | 3 +-
.../command/query/PostgreSQLCommand.java | 2 +-
5 files changed, 49 insertions(+), 13 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index b5b87dc36cc..6787dcfbe21 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -169,8 +169,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
InventoryIncrementalJobItemProgress jobItemProgress = new InventoryIncrementalJobItemProgress();
jobItemProgress.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
jobItemProgress.setDataSourceName(dumperConfig.getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress();
- incrementalTaskProgress.setPosition(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager));
+ IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, dumperConfig, dataSourceManager));
jobItemProgress.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(jobId, i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
index cdc7ecbaab8..167c862835d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/task/progress/IncrementalTaskProgress.java
@@ -17,18 +17,57 @@
package org.apache.shardingsphere.data.pipeline.api.task.progress;
-import lombok.Getter;
-import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* Incremental task progress.
*/
-@Getter
-@Setter
public final class IncrementalTaskProgress implements TaskProgress {
- private volatile IngestPosition<?> position;
+ private final AtomicReference<IngestPosition<?>> position = new AtomicReference<>();
+
+ private final AtomicReference<IncrementalTaskDelay> incrementalTaskDelay = new AtomicReference<>();
+
+ public IncrementalTaskProgress(final IngestPosition<?> position) {
+ this.position.set(position);
+ incrementalTaskDelay.set(new IncrementalTaskDelay());
+ }
+
+ /**
+ * Get position.
+ *
+ * @return position
+ */
+ public IngestPosition<?> getPosition() {
+ return position.get();
+ }
+
+ /**
+ * Set position.
+ *
+ * @param position position
+ */
+ public void setPosition(final IngestPosition<?> position) {
+ this.position.set(position);
+ }
+
+ /**
+ * Get incremental task delay.
+ *
+ * @return incremental task delay
+ */
+ public IncrementalTaskDelay getIncrementalTaskDelay() {
+ return incrementalTaskDelay.get();
+ }
- private IncrementalTaskDelay incrementalTaskDelay = new IncrementalTaskDelay();
+ /**
+ * Set incremental task delay.
+ *
+ * @param incrementalTaskDelay incremental task delay
+ */
+ public void setIncrementalTaskDelay(final IncrementalTaskDelay incrementalTaskDelay) {
+ this.incrementalTaskDelay.set(incrementalTaskDelay);
+ }
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index e2c5b84ebe1..2c5a97f508a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -58,10 +58,9 @@ public final class YamlJobItemIncrementalTasksProgressSwapper {
if (null == yamlProgress) {
return new JobItemIncrementalTasksProgress(null);
}
- IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
// TODO consider to remove parameter databaseType
PositionInitializer positionInitializer = PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, databaseType);
- taskProgress.setPosition(positionInitializer.init(yamlProgress.getPosition()));
+ IncrementalTaskProgress taskProgress = new IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
return new JobItemIncrementalTasksProgress(taskProgress);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 15a7d97a663..530c401da71 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -86,8 +86,7 @@ public final class IncrementalTask implements PipelineTask, AutoCloseable {
}
private IncrementalTaskProgress createIncrementalTaskProgress(final IngestPosition<?> position, final InventoryIncrementalJobItemProgress jobItemProgress) {
- IncrementalTaskProgress result = new IncrementalTaskProgress();
- result.setPosition(position);
+ IncrementalTaskProgress result = new IncrementalTaskProgress(position);
if (null != jobItemProgress && null != jobItemProgress.getIncremental()) {
Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
.ifPresent(optional -> result.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
diff --git a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
index ab7a5cedaa4..92ec40d9d89 100644
--- a/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
+++ b/proxy/frontend/type/postgresql/src/main/java/org/apache/shardingsphere/proxy/frontend/postgresql/command/query/PostgreSQLCommand.java
@@ -154,7 +154,7 @@ public enum PostgreSQLCommand {
}
/*
- * Refer to <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">JDK-8161372</a>.
+ * Refer to <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">JDK-8161372</a>.
*/
@SuppressWarnings("OptionalAssignedToNull")
private static Optional<PostgreSQLCommand> getPostgreSQLCommand(final Class<? extends SQLStatement> sqlStatementClass) {