You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/07 13:30:51 UTC
[shardingsphere] branch master updated: Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2e175ba5d78 Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)
2e175ba5d78 is described below
commit 2e175ba5d780b97d9b0f3ec5e145274da9ce3b51
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 7 21:30:43 2022 +0800
Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)
* Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage
* Fix codestyle
* Add processor
* Improve comment
---
.../handler/JobConfigurationChangedHandler.java | 48 +++-------------------
...tionJobConfigurationChangedEventProcessor.java} | 32 +++++----------
...elineJobConfigurationChangedEventProcessor.java | 36 ++++++++++++++++
...bConfigurationChangedEventProcessorFactory.java | 45 ++++++++++++++++++++
...s.PipelineJobConfigurationChangedEventProcessor | 18 ++++++++
5 files changed, 115 insertions(+), 64 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
index f2efe9c3bb0..f6e849f3902 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
@@ -18,24 +18,18 @@
package org.apache.shardingsphere.data.pipeline.core.spi.handler;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
+import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor;
+import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessorFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
-import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
/**
- * Migration pipeline meta data handler.
+ * Job config changed handler.
*/
@Slf4j
public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
@@ -57,37 +51,7 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
log.error("analyze job config pojo failed.", ex);
return;
}
- String jobId = jobConfigPOJO.getJobName();
- if (jobConfigPOJO.isDisabled()) {
- PipelineJobCenter.stop(jobId);
- return;
- }
- switch (event.getType()) {
- case ADDED:
- case UPDATED:
- if (PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
- log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
- } else {
- log.info("{} executing jobs", jobConfigPOJO.getJobName());
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
- }
- break;
- case DELETED:
- log.info("deleted jobId={}", jobId);
- MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
- new MigrationJobPreparer().cleanup(jobConfig);
- PipelineJobCenter.stop(jobConfigPOJO.getJobName());
- break;
- default:
- break;
- }
- }
-
- private void execute(final JobConfigurationPOJO jobConfigPOJO) {
- MigrationJob job = new MigrationJob();
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
- oneOffJobBootstrap.execute();
- job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ PipelineJobConfigurationChangedEventProcessor processor = PipelineJobConfigurationChangedEventProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
+ processor.process(event, jobConfigPOJO);
}
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
similarity index 78%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
index f2efe9c3bb0..75e931ba70f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
@@ -15,48 +15,31 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
/**
- * Migration pipeline meta data handler.
+ * Migration job config event processor.
*/
@Slf4j
-public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
+public final class MigrationJobConfigurationChangedEventProcessor implements PipelineJobConfigurationChangedEventProcessor {
@Override
- public Pattern getKeyPattern() {
- return PipelineMetaDataNode.CONFIG_PATTERN;
- }
-
- @Override
- public void handle(final DataChangedEvent event) {
- log.info("{} job config: {}", event.getType(), event.getKey());
- JobConfigurationPOJO jobConfigPOJO;
- try {
- jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- log.error("analyze job config pojo failed.", ex);
- return;
- }
+ public void process(final DataChangedEvent event, final JobConfigurationPOJO jobConfigPOJO) {
String jobId = jobConfigPOJO.getJobName();
if (jobConfigPOJO.isDisabled()) {
PipelineJobCenter.stop(jobId);
@@ -90,4 +73,9 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
oneOffJobBootstrap.execute();
job.setOneOffJobBootstrap(oneOffJobBootstrap);
}
+
+ @Override
+ public String getType() {
+ return JobType.MIGRATION.getTypeName();
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
new file mode 100644
index 00000000000..02ef7febb6f
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
+
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+/**
+ * Pipeline job config changed event processor.
+ */
+public interface PipelineJobConfigurationChangedEventProcessor extends TypedSPI {
+
+ /**
+ * Process data changed event.
+ *
+ * @param jobConfigPOJO job config
+ * @param event event
+ */
+ void process(DataChangedEvent event, JobConfigurationPOJO jobConfigPOJO);
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
new file mode 100644
index 00000000000..2ae73d39051
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+/**
+ * Pipeline job config changed event processor factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineJobConfigurationChangedEventProcessorFactory {
+
+ static {
+ ShardingSphereServiceLoader.register(PipelineJobConfigurationChangedEventProcessor.class);
+ }
+
+ /**
+ * Get job changed event processor instance.
+ *
+ * @param jobType job type
+ * @return pipeline event process
+ */
+ public static PipelineJobConfigurationChangedEventProcessor getInstance(final JobType jobType) {
+ return TypedSPIRegistry.getRegisteredService(PipelineJobConfigurationChangedEventProcessor.class, jobType.getTypeName());
+ }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
new file mode 100644
index 00000000000..55caeea6a01
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.process.MigrationJobConfigurationChangedEventProcessor