You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/05/18 04:39:05 UTC

[shardingsphere] branch master updated: Fix sonar issue of PipelineJobProgressPersistService (#25755)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acfe0995c9f Fix sonar issue of PipelineJobProgressPersistService (#25755)
acfe0995c9f is described below

commit acfe0995c9fbe8afad55d263c4ca734e8a330fc1
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu May 18 12:38:57 2023 +0800

    Fix sonar issue of PipelineJobProgressPersistService (#25755)
---
 .../pipeline/core/job/AbstractPipelineJob.java     |  4 +-
 .../persist/PipelineJobProgressPersistService.java | 71 +++++++++++-----------
 2 files changed, 37 insertions(+), 38 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index adb71754739..0b0d9bb394c 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -108,7 +108,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
             return false;
         }
         String jobId = tasksRunner.getJobItemContext().getJobId();
-        PipelineJobProgressPersistService.addJobProgressPersistContext(jobId, shardingItem);
+        PipelineJobProgressPersistService.add(jobId, shardingItem);
         PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)).persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(jobId), shardingItem);
         return true;
     }
@@ -155,7 +155,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     
     private void innerClean() {
         tasksRunnerMap.clear();
-        PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+        PipelineJobProgressPersistService.remove(jobId);
     }
     
     protected abstract void doClean();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index f279bb00899..a1b31bf0549 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -48,65 +48,42 @@ public final class PipelineJobProgressPersistService {
     
     private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("pipeline-progress-persist-%d"));
     
-    private static final long DELAY_SECONDS = 1;
+    private static final long DELAY_SECONDS = 1L;
     
     static {
         JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 0, DELAY_SECONDS, TimeUnit.SECONDS);
     }
     
     /**
-     * Remove job progress persist context.
+     * Add job progress persist context.
      *
-     * @param jobId job id
+     * @param jobId job ID
+     * @param shardingItem sharding item
      */
-    public static void removeJobProgressPersistContext(final String jobId) {
-        JOB_PROGRESS_PERSIST_MAP.remove(jobId);
+    public static void add(final String jobId, final int shardingItem) {
+        JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
     }
     
     /**
-     * Add job progress persist context.
+     * Remove job progress persist context.
      *
-     * @param jobId job id
-     * @param shardingItem sharding item
+     * @param jobId job ID
      */
-    public static void addJobProgressPersistContext(final String jobId, final int shardingItem) {
-        JOB_PROGRESS_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new PipelineJobProgressPersistContext(jobId, shardingItem));
+    public static void remove(final String jobId) {
+        JOB_PROGRESS_PERSIST_MAP.remove(jobId);
     }
     
     /**
      * Notify persist.
      *
-     * @param jobId job id
+     * @param jobId job ID
      * @param shardingItem sharding item
      */
     public static void notifyPersist(final String jobId, final int shardingItem) {
         Map<Integer, PipelineJobProgressPersistContext> persistContextMap = JOB_PROGRESS_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
         PipelineJobProgressPersistContext persistContext = persistContextMap.get(shardingItem);
-        if (null == persistContext) {
-            return;
-        }
-        persistContext.getHasNewEvents().set(true);
-    }
-    
-    private static void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
-        Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
-        if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
-                && !persistContext.getHasNewEvents().get()) {
-            return;
-        }
-        Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItem);
-        if (!jobItemContext.isPresent()) {
-            return;
-        }
-        if (null == beforePersistingProgressMillis) {
-            persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
-        }
-        persistContext.getHasNewEvents().set(false);
-        long startTimeMillis = System.currentTimeMillis();
-        TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
-        persistContext.getBeforePersistingProgressMillis().set(null);
-        if (6 == ThreadLocalRandom.current().nextInt(100)) {
-            log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+        if (null != persistContext) {
+            persistContext.getHasNewEvents().set(true);
         }
     }
     
@@ -118,5 +95,27 @@ public final class PipelineJobProgressPersistService {
                 entry.getValue().forEach((shardingItem, persistContext) -> persist(entry.getKey(), shardingItem, persistContext));
             }
         }
+        
+        private void persist(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
+            Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
+            if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
+                    && !persistContext.getHasNewEvents().get()) {
+                return;
+            }
+            Optional<PipelineJobItemContext> jobItemContext = PipelineJobCenter.getJobItemContext(jobId, shardingItem);
+            if (!jobItemContext.isPresent()) {
+                return;
+            }
+            if (null == beforePersistingProgressMillis) {
+                persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
+            }
+            persistContext.getHasNewEvents().set(false);
+            long startTimeMillis = System.currentTimeMillis();
+            TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getTypeName()).persistJobItemProgress(jobItemContext.get());
+            persistContext.getBeforePersistingProgressMillis().set(null);
+            if (6 == ThreadLocalRandom.current().nextInt(100)) {
+                log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
+            }
+        }
     }
 }