You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/07 13:30:51 UTC

[shardingsphere] branch master updated: Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e175ba5d78 Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)
2e175ba5d78 is described below

commit 2e175ba5d780b97d9b0f3ec5e145274da9ce3b51
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Sep 7 21:30:43 2022 +0800

    Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage (#20782)
    
    * Extract job configuration change event handler SPI and impl in PipelineJobExecutor for common usage
    
    * Fix codestyle
    
    * Add processor
    
    * Improve comment
---
 .../handler/JobConfigurationChangedHandler.java    | 48 +++-------------------
 ...tionJobConfigurationChangedEventProcessor.java} | 32 +++++----------
 ...elineJobConfigurationChangedEventProcessor.java | 36 ++++++++++++++++
 ...bConfigurationChangedEventProcessorFactory.java | 45 ++++++++++++++++++++
 ...s.PipelineJobConfigurationChangedEventProcessor | 18 ++++++++
 5 files changed, 115 insertions(+), 64 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
index f2efe9c3bb0..f6e849f3902 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
@@ -18,24 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.core.spi.handler;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
+import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor;
+import org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessorFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
-import java.util.concurrent.CompletableFuture;
 import java.util.regex.Pattern;
 
 /**
- * Migration pipeline meta data handler.
+ * Job config changed handler.
  */
 @Slf4j
 public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
@@ -57,37 +51,7 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
             log.error("analyze job config pojo failed.", ex);
             return;
         }
-        String jobId = jobConfigPOJO.getJobName();
-        if (jobConfigPOJO.isDisabled()) {
-            PipelineJobCenter.stop(jobId);
-            return;
-        }
-        switch (event.getType()) {
-            case ADDED:
-            case UPDATED:
-                if (PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
-                    log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
-                } else {
-                    log.info("{} executing jobs", jobConfigPOJO.getJobName());
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
-                }
-                break;
-            case DELETED:
-                log.info("deleted jobId={}", jobId);
-                MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
-                new MigrationJobPreparer().cleanup(jobConfig);
-                PipelineJobCenter.stop(jobConfigPOJO.getJobName());
-                break;
-            default:
-                break;
-        }
-    }
-    
-    private void execute(final JobConfigurationPOJO jobConfigPOJO) {
-        MigrationJob job = new MigrationJob();
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
-        oneOffJobBootstrap.execute();
-        job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        PipelineJobConfigurationChangedEventProcessor processor = PipelineJobConfigurationChangedEventProcessorFactory.getInstance(PipelineJobIdUtils.parseJobType(jobConfigPOJO.getJobName()));
+        processor.process(event, jobConfigPOJO);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
similarity index 78%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
index f2efe9c3bb0..75e931ba70f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/handler/JobConfigurationChangedHandler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/MigrationJobConfigurationChangedEventProcessor.java
@@ -15,48 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.handler;
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
 
 /**
- * Migration pipeline meta data handler.
+ * Migration job config event processor.
  */
 @Slf4j
-public final class JobConfigurationChangedHandler implements PipelineMetaDataChangedHandler {
+public final class MigrationJobConfigurationChangedEventProcessor implements PipelineJobConfigurationChangedEventProcessor {
     
     @Override
-    public Pattern getKeyPattern() {
-        return PipelineMetaDataNode.CONFIG_PATTERN;
-    }
-    
-    @Override
-    public void handle(final DataChangedEvent event) {
-        log.info("{} job config: {}", event.getType(), event.getKey());
-        JobConfigurationPOJO jobConfigPOJO;
-        try {
-            jobConfigPOJO = YamlEngine.unmarshal(event.getValue(), JobConfigurationPOJO.class, true);
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            log.error("analyze job config pojo failed.", ex);
-            return;
-        }
+    public void process(final DataChangedEvent event, final JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             PipelineJobCenter.stop(jobId);
@@ -90,4 +73,9 @@ public final class JobConfigurationChangedHandler implements PipelineMetaDataCha
         oneOffJobBootstrap.execute();
         job.setOneOffJobBootstrap(oneOffJobBootstrap);
     }
+    
+    @Override
+    public String getType() {
+        return JobType.MIGRATION.getTypeName();
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
new file mode 100644
index 00000000000..02ef7febb6f
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
+
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+/**
+ * Pipeline job config changed event processor.
+ */
+public interface PipelineJobConfigurationChangedEventProcessor extends TypedSPI {
+    
+    /**
+     * Process data changed event.
+     *
+     * @param jobConfigPOJO job config
+     * @param event event
+     */
+    void process(DataChangedEvent event, JobConfigurationPOJO jobConfigPOJO);
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
new file mode 100644
index 00000000000..2ae73d39051
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/process/PipelineJobConfigurationChangedEventProcessorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.spi.process;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+/**
+ * Pipeline job config changed event processor factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineJobConfigurationChangedEventProcessorFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(PipelineJobConfigurationChangedEventProcessor.class);
+    }
+    
+    /**
+     * Get job changed event processor instance.
+     *
+     * @param jobType job type
+     * @return pipeline event process
+     */
+    public static PipelineJobConfigurationChangedEventProcessor getInstance(final JobType jobType) {
+        return TypedSPIRegistry.getRegisteredService(PipelineJobConfigurationChangedEventProcessor.class, jobType.getTypeName());
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
new file mode 100644
index 00000000000..55caeea6a01
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.process.PipelineJobConfigurationChangedEventProcessor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.spi.process.MigrationJobConfigurationChangedEventProcessor