You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/26 12:36:00 UTC

[shardingsphere] branch master updated: Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cc8f2ecc543 Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)
cc8f2ecc543 is described below

commit cc8f2ecc5430b55f89c0a145cebc9f1a08eefe2d
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Mon Dec 26 20:35:54 2022 +0800

    Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)
---
 .../PipelineChangedJobConfigurationProcessor.java      | 10 ++++------
 .../impl/ChangedJobConfigurationDispatcher.java        | 11 ++++++-----
 ...nsistencyCheckChangedJobConfigurationProcessor.java | 18 +++++++++---------
 .../MigrationChangedJobConfigurationProcessor.java     | 18 +++++++++---------
 4 files changed, 28 insertions(+), 29 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index 59ad2e99a0c..608f64143d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -17,22 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
 
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 /**
  * Pipeline changed job configuration processor.
  */
-// TODO rename to PipelineJobConfigurationChangedProcessor
 public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
     
     /**
      * Process changed job configuration.
      *
      * @param eventType event type
-     * @param jobConfigPOJO job configuration pojo
+     * @param jobConfig job configuration
      */
-    // TODO replace JobConfigurationPOJO to JobConfiguration
-    void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
+    void process(Type eventType, JobConfiguration jobConfig);
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index 95ca1d43c33..02bcd1c94c0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -41,17 +42,17 @@ public final class ChangedJobConfigurationDispatcher implements PipelineMetaData
     
     @Override
     public void handle(final DataChangedEvent event) {
-        JobConfigurationPOJO jobConfigPOJO;
+        JobConfiguration jobConfig;
         try {
-            jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
+            jobConfig = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true).toJobConfiguration();
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
             log.error("unmarshal job configuration pojo failed.", ex);
             return;
         }
-        log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfigPOJO.isDisabled());
-        PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()))
-                .ifPresent(processor -> processor.process(event.getType(), jobConfigPOJO));
+        log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfig.isDisabled());
+        PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfig.getJobName()))
+                .ifPresent(processor -> processor.process(event.getType(), jobConfig));
     }
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index d4c9978ebfd..bea833d9879 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -26,9 +26,9 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
 import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -40,9 +40,9 @@ import java.util.concurrent.CompletableFuture;
 public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
     @Override
-    public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
-        String jobId = jobConfigPOJO.getJobName();
-        if (jobConfigPOJO.isDisabled()) {
+    public void process(final Type eventType, final JobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobName();
+        if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
             for (Integer each : shardingItems) {
@@ -56,7 +56,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+                    CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
                         if (null != throwable) {
                             log.error("execute failed, jobId={}", jobId, throwable);
                         }
@@ -71,10 +71,10 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
         }
     }
     
-    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+    private void execute(final JobConfiguration jobConfig) {
         ConsistencyCheckJob job = new ConsistencyCheckJob();
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+        PipelineJobCenter.addJob(jobConfig.getJobName(), job);
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfig);
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index 83d6a791561..a7dbe0c31e3 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.Migrat
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
@@ -43,9 +43,9 @@ import java.util.concurrent.CompletableFuture;
 public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
     
     @Override
-    public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
-        String jobId = jobConfigPOJO.getJobName();
-        if (jobConfigPOJO.isDisabled()) {
+    public void process(final Type eventType, final JobConfiguration jobConfig) {
+        String jobId = jobConfig.getJobName();
+        if (jobConfig.isDisabled()) {
             Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
             PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
@@ -60,7 +60,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 if (PipelineJobCenter.isJobExisting(jobId)) {
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+                    CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
                         if (null != throwable) {
                             log.error("execute failed, jobId={}", jobId, throwable);
                         }
@@ -68,7 +68,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                 }
                 break;
             case DELETED:
-                new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()));
+                new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
                 PipelineJobCenter.stop(jobId);
                 break;
             default:
@@ -76,10 +76,10 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
         }
     }
     
-    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+    private void execute(final JobConfiguration jobConfig) {
         MigrationJob job = new MigrationJob();
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+        PipelineJobCenter.addJob(jobConfig.getJobName(), job);
+        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfig);
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
     }