You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/18 04:39:05 UTC
[shardingsphere] branch master updated: Fix sonar issue of PipelineJobProgressPersistService (#25755)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new acfe0995c9f Fix sonar issue of PipelineJobProgressPersistService (#25755)
acfe0995c9f is described below
commit acfe0995c9fbe8afad55d263c4ca734e8a330fc1
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 18 12:38:57 2023 +0800
Fix sonar issue of PipelineJobProgressPersistService (#25755)
---
.../pipeline/core/job/AbstractPipelineJob.java | 4 +-
.../persist/PipelineJobProgressPersistService.java | 71 +++++++++++-----------
2 files changed, 37 insertions(+), 38 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index adb71754739..0b0d9bb394c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -108,7 +108,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
return false;
}
String jobId = tasksRunner.getJobItemContext().getJobId();
- PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, shardingItem);
+ PipelineJobProgressPersistService.add(jobId, shardingItem);
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)).persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId), shardingItem);
return true;
}
@@ -155,7 +155,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
private void innerClean() {
tasksRunnerMap.clear();
- PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ PipelineJobProgressPersistService.remove(jobId);
}
protected abstract void doClean();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index f279bb00899..a1b31bf0549 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -48,65 +48,42 @@ public final class PipelineJobProgressPersistService {
private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
- private static final long DELAY_SECONDS = 1;
+ private static final long DELAY_SECONDS = 1L;
static {
JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 0, DELAY_SECONDS, TimeUnit.SECONDS);
}
/**
- * Remove job progress persist context.
+ * Add job progress persist context.
*
- * @param jobId job id
+ * @param jobId job ID
+ * @param shardingItem sharding item
*/
- public static void removeJobProgressPersistContext(final String jobId) {
- JOB_PROGRESS_PERSIST_MAP.remove(jobId);
+ public static void add(final String jobId, final int shardingItem) {
+ JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
}
/**
- * Add job progress persist context.
+ * Remove job progress persist context.
*
- * @param jobId job id
- * @param shardingItem sharding item
+ * @param jobId job ID
*/
- public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
- JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
+ public static void remove(final String jobId) {
+ JOB_PROGRESS_PERSIST_MAP.remove(jobId);
}
/**
* Notify persist.
*
- * @param jobId job id
+ * @param jobId job ID
* @param shardingItem sharding item
*/
public static void notifyPersist(final String jobId, final int shardingItem) {
Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
- if (null == persistContext) {
- return;
- }
- persistContext.getHasNewEvents().set(true);
- }
-
- private static void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
- Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
- if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
- && !persistContext.getHasNewEvents().get()) {
- return;
- }
- Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItem);
- if (!jobItemContext.isPresent()) {
- return;
- }
- if (null == beforePersistingProgressMillis) {
- persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
- }
- persistContext.getHasNewEvents().set(false);
- long startTimeMillis = System.currentTimeMillis();
- TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
- persistContext.getBeforePersistingProgressMillis().set(null);
- if (6 == ThreadLocalRandom.current().nextInt(100)) {
- log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+ if (null != persistContext) {
+ persistContext.getHasNewEvents().set(true);
}
}
@@ -118,5 +95,27 @@ public final class PipelineJobProgressPersistService {
entry.getValue().forEach((shardingItem, persistContext) -> persist(entry.getKey(), shardingItem, persistContext));
}
}
+
+ private void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
+ Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
+ if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
+ && !persistContext.getHasNewEvents().get()) {
+ return;
+ }
+ Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItem);
+ if (!jobItemContext.isPresent()) {
+ return;
+ }
+ if (null == beforePersistingProgressMillis) {
+ persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
+ }
+ persistContext.getHasNewEvents().set(false);
+ long startTimeMillis = System.currentTimeMillis();
+ TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
+ persistContext.getBeforePersistingProgressMillis().set(null);
+ if (6 == ThreadLocalRandom.current().nextInt(100)) {
+ log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+ }
+ }
}
}