You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/07/29 10:45:49 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #19695: Extract job progress persistence from RuleAlteredJobSchedulerCenter

azexcy opened a new pull request, #19695:
URL: https://github.com/apache/shardingsphere/pull/19695

   Relation #19457.
   
   Changes proposed in this pull request:
   - Use JobPersistCallback instead of schedule persist.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] codecov-commenter commented on pull request #19695: Extract job progress persistence from RuleAlteredJobSchedulerCenter

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #19695:
URL: https://github.com/apache/shardingsphere/pull/19695#issuecomment-1200127341

   # [Codecov](https://codecov.io/gh/apache/shardingsphere/pull/19695?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#19695](https://codecov.io/gh/apache/shardingsphere/pull/19695?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2d6fc96) into [master](https://codecov.io/gh/apache/shardingsphere/commit/4d0e3bd2117cdf33cf9a7790820632abc099d166?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4d0e3bd) will **increase** coverage by `0.01%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #19695      +/-   ##
   ============================================
   + Coverage     60.01%   60.02%   +0.01%     
   - Complexity     2387     2389       +2     
   ============================================
     Files          3847     3848       +1     
     Lines         54789    54851      +62     
     Branches       7660     7661       +1     
   ============================================
   + Hits          32879    32927      +48     
   - Misses        19091    19099       +8     
   - Partials       2819     2825       +6     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/shardingsphere/pull/19695?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../data/pipeline/core/importer/AbstractImporter.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2NvcmUvaW1wb3J0ZXIvQWJzdHJhY3RJbXBvcnRlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...phere/data/pipeline/core/task/IncrementalTask.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2NvcmUvdGFzay9JbmNyZW1lbnRhbFRhc2suamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...gsphere/data/pipeline/core/task/InventoryTask.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL2NvcmUvdGFzay9JbnZlbnRvcnlUYXNrLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ario/rulealtered/RuleAlteredJobPersistService.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL3NjZW5hcmlvL3J1bGVhbHRlcmVkL1J1bGVBbHRlcmVkSm9iUGVyc2lzdFNlcnZpY2UuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...e/scenario/rulealtered/RuleAlteredJobPreparer.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL3NjZW5hcmlvL3J1bGVhbHRlcmVkL1J1bGVBbHRlcmVkSm9iUHJlcGFyZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...rio/rulealtered/RuleAlteredJobSchedulerCenter.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL3NjZW5hcmlvL3J1bGVhbHRlcmVkL1J1bGVBbHRlcmVkSm9iU2NoZWR1bGVyQ2VudGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rio/rulealtered/prepare/InventoryTaskSplitter.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9kYXRhL3BpcGVsaW5lL3NjZW5hcmlvL3J1bGVhbHRlcmVkL3ByZXBhcmUvSW52ZW50b3J5VGFza1NwbGl0dGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ere/scaling/core/job/importer/ImporterFactory.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9zY2FsaW5nL2NvcmUvam9iL2ltcG9ydGVyL0ltcG9ydGVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ling/core/job/persist/AsyncJobPersistCallback.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9zaGFyZGluZ3NwaGVyZS9zY2FsaW5nL2NvcmUvam9iL3BlcnNpc3QvQXN5bmNKb2JQZXJzaXN0Q2FsbGJhY2suamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...re/data/pipeline/mysql/importer/MySQLImporter.java](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2hhcmRpbmdzcGhlcmUta2VybmVsL3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUvc2hhcmRpbmdzcGhlcmUtZGF0YS1waXBlbGluZS1kaWFsZWN0L3NoYXJkaW5nc3BoZXJlLWRhdGEtcGlwZWxpbmUtbXlzcWwvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvbXlzcWwvaW1wb3J0ZXIvTXlTUUxJbXBvcnRlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [21 more](https://codecov.io/gh/apache/shardingsphere/pull/19695/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #19695: Extract job progress persistence from RuleAlteredJobSchedulerCenter

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #19695:
URL: https://github.com/apache/shardingsphere/pull/19695#discussion_r933116218


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/JobPersistCallback.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
+
+public interface JobPersistCallback {

Review Comment:
   Javadoc of interface/class is necessary



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistIntervalParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final Executor SINGLE_EXECUTOR = Executors.newSingleThreadExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-single-%d"));
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();
+    
+    private static final Map<String, Map<Integer, JobPersistIntervalParameter>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    
+    private static final int MIN_PERSIST_INTERVAL_MILLIS = 1000;
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 2, TimeUnit.SECONDS);

Review Comment:
   Could we reduce the delay to 1 second?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistIntervalParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final Executor SINGLE_EXECUTOR = Executors.newSingleThreadExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-single-%d"));
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();
+    
+    private static final Map<String, Map<Integer, JobPersistIntervalParameter>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    
+    private static final int MIN_PERSIST_INTERVAL_MILLIS = 1000;
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 2, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Get job schedule.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job schedule
+     */
+    public static Optional<RuleAlteredJobScheduler> getJobSchedule(final String jobId, final int shardingItem) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>());
+        return Optional.ofNullable(schedulerMap.get(shardingItem));
+    }
+    
+    /**
+     * List job schedules.
+     *
+     * @param jobId job id
+     * @return job schedules belong this job id
+     */
+    public static Collection<RuleAlteredJobScheduler> listJobSchedule(final String jobId) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>());
+        return schedulerMap.values();
+    }
+    
+    /**
+     * Remove job schedule map by job id.
+     *
+     * @param jobId job id
+     */
+    public static void removeJobSchedule(final String jobId) {
+        log.info("Remove job schedule by job id: {}", jobId);
+        JOB_SCHEDULER_MAP.remove(jobId);
+        JOB_PERSIST_MAP.remove(jobId);
+    }
+    
+    /**
+     * Add job schedule.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param jobScheduler job schedule
+     */
+    public static void addJobSchedule(final String jobId, final int shardingItem, final RuleAlteredJobScheduler jobScheduler) {
+        log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
+        JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, jobScheduler);
+        JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new JobPersistIntervalParameter(jobId, shardingItem));
+    }
+    
+    /**
+     * Persist job process, may not be implemented immediately, depending on persist interval.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param currentTimeMills current time mills
+     */
+    public static void triggerPersist(final String jobId, final int shardingItem, final long currentTimeMills) {
+        Map<Integer, JobPersistIntervalParameter> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+        JobPersistIntervalParameter parameter = intervalParamMap.get(shardingItem);
+        if (null == parameter) {
+            log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        parameter.getAlreadyPersisted().compareAndSet(true, false);
+        if ((currentTimeMills - parameter.getPersistTime().get()) < MIN_PERSIST_INTERVAL_MILLIS) {
+            return;
+        }
+        if (parameter.getLock().tryLock()) {
+            try {
+                SINGLE_EXECUTOR.execute(() -> persist(jobId, shardingItem, currentTimeMills, parameter));
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.error("Persist job process failed, job id: {}, sharding item: {}", jobId, shardingItem, ex);
+            } finally {
+                parameter.getLock().unlock();
+            }
+        }

Review Comment:
   1, Is it necessary to do persist here? Since `JOB_PERSIST_EXECUTOR` exists.
   
   2, There might be duplicated persistenance on high concurrent invocations.



##########
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureJobPersistCallback.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistCallback;
+
+public class FixtureJobPersistCallback implements JobPersistCallback {

Review Comment:
   `final`could be added for class



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistIntervalParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final Executor SINGLE_EXECUTOR = Executors.newSingleThreadExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-single-%d"));
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();

Review Comment:
   It's better still keep `JOB_SCHEDULER_MAP` in `RuleAlteredJobSchedulerCenter`, `RuleAlteredJobPersistService` could just do persistence.



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/JobPersistIntervalParameter.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Getter
+@RequiredArgsConstructor
+public final class JobPersistIntervalParameter {
+    
+    private final String jobId;
+    
+    private final int shardingItem;
+    
+    private final AtomicLong persistTime = new AtomicLong(System.currentTimeMillis());
+    
+    private final AtomicBoolean alreadyPersisted = new AtomicBoolean(false);
+    
+    private final ReentrantLock lock = new ReentrantLock();

Review Comment:
   Might be not necessary



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistIntervalParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final Executor SINGLE_EXECUTOR = Executors.newSingleThreadExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-single-%d"));
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    private static final Map<String, Map<Integer, RuleAlteredJobScheduler>> JOB_SCHEDULER_MAP = new ConcurrentHashMap<>();
+    
+    private static final Map<String, Map<Integer, JobPersistIntervalParameter>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    
+    private static final int MIN_PERSIST_INTERVAL_MILLIS = 1000;
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 2, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Get job schedule.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job schedule
+     */
+    public static Optional<RuleAlteredJobScheduler> getJobSchedule(final String jobId, final int shardingItem) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>());
+        return Optional.ofNullable(schedulerMap.get(shardingItem));
+    }
+    
+    /**
+     * List job schedules.
+     *
+     * @param jobId job id
+     * @return job schedules belong this job id
+     */
+    public static Collection<RuleAlteredJobScheduler> listJobSchedule(final String jobId) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>());
+        return schedulerMap.values();
+    }
+    
+    /**
+     * Remove job schedule map by job id.
+     *
+     * @param jobId job id
+     */
+    public static void removeJobSchedule(final String jobId) {
+        log.info("Remove job schedule by job id: {}", jobId);
+        JOB_SCHEDULER_MAP.remove(jobId);
+        JOB_PERSIST_MAP.remove(jobId);
+    }
+    
+    /**
+     * Add job schedule.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param jobScheduler job schedule
+     */
+    public static void addJobSchedule(final String jobId, final int shardingItem, final RuleAlteredJobScheduler jobScheduler) {
+        log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
+        JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, jobScheduler);
+        JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new JobPersistIntervalParameter(jobId, shardingItem));
+    }
+    
+    /**
+     * Persist job process, may not be implemented immediately, depending on persist interval.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param currentTimeMills current time mills
+     */
+    public static void triggerPersist(final String jobId, final int shardingItem, final long currentTimeMills) {
+        Map<Integer, JobPersistIntervalParameter> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+        JobPersistIntervalParameter parameter = intervalParamMap.get(shardingItem);
+        if (null == parameter) {
+            log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        parameter.getAlreadyPersisted().compareAndSet(true, false);
+        if ((currentTimeMills - parameter.getPersistTime().get()) < MIN_PERSIST_INTERVAL_MILLIS) {
+            return;
+        }
+        if (parameter.getLock().tryLock()) {
+            try {
+                SINGLE_EXECUTOR.execute(() -> persist(jobId, shardingItem, currentTimeMills, parameter));
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.error("Persist job process failed, job id: {}, sharding item: {}", jobId, shardingItem, ex);
+            } finally {
+                parameter.getLock().unlock();
+            }
+        }
+    }
+    
+    private static void persist(final String jobId, final int shardingItem, final long persistTimeMillis, final JobPersistIntervalParameter param) {
+        Optional<RuleAlteredJobScheduler> jobSchedule = getJobSchedule(jobId, shardingItem);
+        if (!jobSchedule.isPresent()) {
+            log.warn("job schedule not exists, job id: {}, sharding item: {}", jobId, shardingItem);
+            return;
+        }
+        log.info("execute persist, job id={}, sharding item={}, persistTimeMillis={}", jobId, shardingItem, persistTimeMillis);
+        REPOSITORY_API.persistJobProgress(jobSchedule.get().getJobContext());
+        param.getPersistTime().set(persistTimeMillis);
+        param.getAlreadyPersisted().set(true);
+    }
+    
+    private static final class PersistJobContextRunnable implements Runnable {
+        
+        @Override
+        public void run() {
+            long currentTimeMillis = System.currentTimeMillis();
+            for (Entry<String, Map<Integer, JobPersistIntervalParameter>> entry : JOB_PERSIST_MAP.entrySet()) {
+                entry.getValue().forEach((shardingItem, param) -> {
+                    if (param.getAlreadyPersisted().get() || currentTimeMillis - param.getPersistTime().get() < MIN_PERSIST_INTERVAL_MILLIS) {
+                        return;
+                    }
+                    boolean tryLock = false;
+                    try {
+                        tryLock = param.getLock().tryLock();
+                        if (tryLock) {
+                            persist(entry.getKey(), shardingItem, currentTimeMillis, param);
+                        }
+                    } finally {
+                        if (tryLock) {
+                            param.getLock().unlock();
+                        }
+                    }

Review Comment:
   `tryLock` might be not necessary anymore if `triggerPersist` remove persistance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #19695: Extract job progress persistence from RuleAlteredJobSchedulerCenter

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #19695:
URL: https://github.com/apache/shardingsphere/pull/19695#discussion_r933723943


##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/JobPersistIntervalParameter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Getter
+@RequiredArgsConstructor
+public final class JobPersistIntervalParameter {

Review Comment:
   Could `JobPersistIntervalParameter` be renamed to `PipelineJobPersistContext`?



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/persist/JobPersistIntervalParameter.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job.persist;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Getter
+@RequiredArgsConstructor
+public final class JobPersistIntervalParameter {
+    
+    private final String jobId;
+    
+    private final int shardingItem;
+    
+    private final AtomicLong persistTime = new AtomicLong(System.currentTimeMillis());

Review Comment:
   Looks `persistTime` is not read any more



##########
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPersistService.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.persist.JobPersistIntervalParameter;
+import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * Rule altered job persist service.
+ */
+
+@Slf4j
+public final class RuleAlteredJobPersistService {
+    
+    private static final Map<String, Map<Integer, JobPersistIntervalParameter>> JOB_PERSIST_MAP = new ConcurrentHashMap<>();
+    
+    private static final GovernanceRepositoryAPI REPOSITORY_API = PipelineAPIFactory.getGovernanceRepositoryAPI();
+    
+    private static final ScheduledExecutorService JOB_PERSIST_EXECUTOR = Executors.newSingleThreadScheduledExecutor(ExecutorThreadFactoryBuilder.build("scaling-job-schedule-%d"));
+    
+    static {
+        JOB_PERSIST_EXECUTOR.scheduleWithFixedDelay(new PersistJobContextRunnable(), 5, 1, TimeUnit.SECONDS);
+    }
+    
+    /**
+     * Remove job schedule map by job id.
+     *
+     * @param jobId job id
+     */
+    public static void removeJobPersistParameter(final String jobId) {
+        log.info("Remove job persist, job id: {}", jobId);
+        JOB_PERSIST_MAP.remove(jobId);
+    }
+    
+    /**
+     * Add job schedule.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public static void addJobPersistParameter(final String jobId, final int shardingItem) {
+        log.info("Add job schedule, jobId={}, shardingItem={}", jobId, shardingItem);
+        JOB_PERSIST_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>()).put(shardingItem, new JobPersistIntervalParameter(jobId, shardingItem));
+    }
+    
+    /**
+     * Persist job process, may not be implemented immediately, depending on persist interval.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    public static void triggerPersist(final String jobId, final int shardingItem) {
+        Map<Integer, JobPersistIntervalParameter> intervalParamMap = JOB_PERSIST_MAP.getOrDefault(jobId, Collections.emptyMap());
+        JobPersistIntervalParameter parameter = intervalParamMap.get(shardingItem);
+        if (null == parameter) {
+            log.debug("Persist interval parameter is null, jobId={}, shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        parameter.getAlreadyPersisted().compareAndSet(true, false);
+    }
+    
+    private static void persist(final String jobId, final int shardingItem, final long persistTimeMillis, final JobPersistIntervalParameter param) {
+        Map<Integer, RuleAlteredJobScheduler> schedulerMap = RuleAlteredJobSchedulerCenter.JOB_SCHEDULER_MAP.computeIfAbsent(jobId, key -> new ConcurrentHashMap<>());
+        RuleAlteredJobScheduler scheduler = schedulerMap.get(shardingItem);
+        if (scheduler == null) {

Review Comment:
   1, It's better to supply method in RuleAlteredJobSchedulerCenter to access needed RuleAlteredJobScheduler.
   
   2, `scheduler == null` could be `null == scheduler`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #19695: Extract job progress persistence from RuleAlteredJobSchedulerCenter

Posted by GitBox <gi...@apache.org>.
sandynz merged PR #19695:
URL: https://github.com/apache/shardingsphere/pull/19695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org