You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/12/26 12:36:00 UTC
[shardingsphere] branch master updated: Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)
This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cc8f2ecc543 Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)
cc8f2ecc543 is described below
commit cc8f2ecc5430b55f89c0a145cebc9f1a08eefe2d
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Mon Dec 26 20:35:54 2022 +0800
Refactor PipelineChangedJobConfigurationProcessor.process parameter (#23107)
---
.../PipelineChangedJobConfigurationProcessor.java | 10 ++++------
.../impl/ChangedJobConfigurationDispatcher.java | 11 ++++++-----
...nsistencyCheckChangedJobConfigurationProcessor.java | 18 +++++++++---------
.../MigrationChangedJobConfigurationProcessor.java | 18 +++++++++---------
4 files changed, 28 insertions(+), 29 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
index 59ad2e99a0c..608f64143d6 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/PipelineChangedJobConfigurationProcessor.java
@@ -17,22 +17,20 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
/**
* Pipeline changed job configuration processor.
*/
-// TODO rename to PipelineJobConfigurationChangedProcessor
public interface PipelineChangedJobConfigurationProcessor extends TypedSPI {
/**
* Process changed job configuration.
*
* @param eventType event type
- * @param jobConfigPOJO job configuration pojo
+ * @param jobConfig job configuration
*/
- // TODO replace JobConfigurationPOJO to JobConfiguration
- void process(DataChangedEvent.Type eventType, JobConfigurationPOJO jobConfigPOJO);
+ void process(Type eventType, JobConfiguration jobConfig);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
index 95ca1d43c33..02bcd1c94c0 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/event/handler/impl/ChangedJobConfigurationDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessorFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineMetaDataChangedEventHandler;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -41,17 +42,17 @@ public final class ChangedJobConfigurationDispatcher implements PipelineMetaData
@Override
public void handle(final DataChangedEvent event) {
- JobConfigurationPOJO jobConfigPOJO;
+ JobConfiguration jobConfig;
try {
- jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
+ jobConfig = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true).toJobConfiguration();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
log.error("unmarshal job configuration pojo failed.", ex);
return;
}
- log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfigPOJO.isDisabled());
- PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()))
- .ifPresent(processor -> processor.process(event.getType(), jobConfigPOJO));
+ log.info("{} job configuration: {}, disabled={}", event.getType(), event.getKey(), jobConfig.isDisabled());
+ PipelineChangedJobConfigurationProcessorFactory.findInstance(PipelineJobIdUtils.parseJobType(jobConfig.getJobName()))
+ .ifPresent(processor -> processor.process(event.getType(), jobConfig));
}
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
index d4c9978ebfd..bea833d9879 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -26,9 +26,9 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDa
import org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@@ -40,9 +40,9 @@ import java.util.concurrent.CompletableFuture;
public final class ConsistencyCheckChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
@Override
- public void process(final DataChangedEvent.Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
- String jobId = jobConfigPOJO.getJobName();
- if (jobConfigPOJO.isDisabled()) {
+ public void process(final Type eventType, final JobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobName();
+ if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
for (Integer each : shardingItems) {
@@ -56,7 +56,7 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
if (PipelineJobCenter.isJobExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("execute failed, jobId={}", jobId, throwable);
}
@@ -71,10 +71,10 @@ public final class ConsistencyCheckChangedJobConfigurationProcessor implements P
}
}
- private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+ private void execute(final JobConfiguration jobConfig) {
ConsistencyCheckJob job = new ConsistencyCheckJob();
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+ PipelineJobCenter.addJob(jobConfig.getJobName(), job);
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
index 83d6a791561..a7dbe0c31e3 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationChangedJobConfigurationProcessor.java
@@ -29,7 +29,7 @@ import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.Migrat
import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.spi.barrier.PipelineDistributedBarrierFactory;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -43,9 +43,9 @@ import java.util.concurrent.CompletableFuture;
public final class MigrationChangedJobConfigurationProcessor implements PipelineChangedJobConfigurationProcessor {
@Override
- public void process(final Type eventType, final JobConfigurationPOJO jobConfigPOJO) {
- String jobId = jobConfigPOJO.getJobName();
- if (jobConfigPOJO.isDisabled()) {
+ public void process(final Type eventType, final JobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobName();
+ if (jobConfig.isDisabled()) {
Collection<Integer> shardingItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrierFactory.getInstance();
@@ -60,7 +60,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
if (PipelineJobCenter.isJobExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ CompletableFuture.runAsync(() -> execute(jobConfig), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
if (null != throwable) {
log.error("execute failed, jobId={}", jobId, throwable);
}
@@ -68,7 +68,7 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
}
break;
case DELETED:
- new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()));
+ new MigrationJobPreparer().cleanup(new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()));
PipelineJobCenter.stop(jobId);
break;
default:
@@ -76,10 +76,10 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
}
}
- private void execute(final JobConfigurationPOJO jobConfigPOJO) {
+ private void execute(final JobConfiguration jobConfig) {
MigrationJob job = new MigrationJob();
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
+ PipelineJobCenter.addJob(jobConfig.getJobName(), job);
+ OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfig);
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
}