You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/28 02:48:02 UTC

[shardingsphere] branch master updated: Add pipeline elastic job listener (#24807)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d865f39e9d9 Add pipeline elastic job listener (#24807)
d865f39e9d9 is described below

commit d865f39e9d9e73865b097c009962a6a6afb456d8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Mar 28 10:47:54 2023 +0800

    Add pipeline elastic job listener (#24807)
    
    * Add pipeline job listener
    
    * Adjust persist barrier position and sleep time
    
    * Improve class name
    
    * stop consistency check job before drop
    
    * Fix ci timeout
    
    * Fix stop DistSQL blocked when job already stopped
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  6 ++
 .../pipeline/core/job/AbstractPipelineJob.java     | 29 +++++++++-
 .../PipelineContextManagerLifecycleListener.java   |  3 +
 .../core/listener/PipelineElasticJobListener.java  | 64 ++++++++++++++++++++++
 .../AbstractChangedJobConfigurationProcessor.java  | 13 +++--
 .../core/util/PipelineDistributedBarrier.java      |  2 +-
 ...re.elasticjob.infra.listener.ElasticJobListener | 18 ++++++
 7 files changed, 126 insertions(+), 9 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 919e3fc09ba..a158ba9ea1a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.UnsupportedModeTypeException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -44,6 +45,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -115,6 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
         result.getProps().setProperty("create_time", createTimeFormat);
         result.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
+        result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
         return result;
     }
     
@@ -143,6 +146,9 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
         pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+        if (jobConfigPOJO.isDisabled()) {
+            return;
+        }
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
         jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 0e636e73067..5fcf64b5522 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -26,8 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
@@ -36,6 +39,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract pipeline job.
@@ -113,13 +117,32 @@ public abstract class AbstractPipelineJob implements PipelineJob {
     
     private void innerStop() {
         stopping = true;
-        if (null != jobBootstrap) {
-            jobBootstrap.shutdown();
-        }
         log.info("stop tasks runner, jobId={}", jobId);
         for (PipelineTasksRunner each : tasksRunnerMap.values()) {
             each.stop();
         }
+        Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
+        pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
+        if (null == jobBootstrap) {
+            return;
+        }
+        jobBootstrap.shutdown();
+    }
+    
+    private void awaitJobStopped(final PipelineElasticJobListener jobListener, final String jobId, final long timeoutMillis) {
+        int time = 0;
+        int sleepTime = 50;
+        while (time < timeoutMillis) {
+            if (!jobListener.isJobRunning(jobId)) {
+                break;
+            }
+            try {
+                Thread.sleep(sleepTime);
+            } catch (final InterruptedException ignored) {
+                break;
+            }
+            time += sleepTime;
+        }
     }
     
     private void innerClean() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 093401d2779..864c16f0aa8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobWorker;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -42,5 +44,6 @@ public final class PipelineContextManagerLifecycleListener implements ContextMan
         PipelineContext.initModeConfig(modeConfig);
         PipelineContext.initContextManager(contextManager);
         PipelineJobWorker.initialize();
+        ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
new file mode 100644
index 00000000000..7861b576f78
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Pipeline elastic job listener.
+ */
+@Slf4j
+public final class PipelineElasticJobListener implements ElasticJobListener {
+    
+    // TODO ElasticJobListenerFactory.createListener return new class instance, it's the reason why static variables
+    private static final Map<String, Long> RUNNING_JOBS = new ConcurrentHashMap<>();
+    
+    @Override
+    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
+        if (RUNNING_JOBS.containsKey(shardingContexts.getJobName())) {
+            log.warn("{} already exists", shardingContexts.getJobName());
+        }
+        RUNNING_JOBS.put(shardingContexts.getJobName(), System.currentTimeMillis());
+    }
+    
+    @Override
+    public void afterJobExecuted(final ShardingContexts shardingContexts) {
+        log.info("After {} job execute ", shardingContexts.getJobName());
+        RUNNING_JOBS.remove(shardingContexts.getJobName());
+    }
+    
+    /**
+     * Is job running.
+     *
+     * @param jobId job id
+     * @return true if job is running otherwise false
+     */
+    public boolean isJobRunning(final String jobId) {
+        return RUNNING_JOBS.containsKey(jobId);
+    }
+    
+    @Override
+    public String getType() {
+        return PipelineElasticJobListener.class.getName();
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index 9abebfbfdde..3c0a749f191 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -29,6 +29,8 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
+import java.util.Collection;
+
 /**
  * Abstract changed job configuration processor.
  */
@@ -38,16 +40,17 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
     @Override
     public void process(final Type eventType, final JobConfiguration jobConfig) {
         boolean disabled = jobConfig.isDisabled();
-        if (disabled) {
-            onDisabled(jobConfig);
-        }
         boolean deleted = Type.DELETED == eventType;
         if (deleted) {
             onDeleted(jobConfig);
         }
         String jobId = jobConfig.getJobName();
         if (disabled || deleted) {
+            Collection<Integer> jobItems = PipelineJobCenter.getShardingItems(jobId);
             PipelineJobCenter.stop(jobId);
+            if (disabled) {
+                onDisabled(jobConfig, jobItems);
+            }
             return;
         }
         switch (eventType) {
@@ -64,9 +67,9 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
         }
     }
     
-    protected void onDisabled(final JobConfiguration jobConfig) {
+    protected void onDisabled(final JobConfiguration jobConfig, final Collection<Integer> jobItems) {
         String jobId = jobConfig.getJobName();
-        for (Integer each : PipelineJobCenter.getShardingItems(jobId)) {
+        for (Integer each : jobItems) {
             PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
         }
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 6d3e050d903..90aca859bb9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -99,7 +99,7 @@ public final class PipelineDistributedBarrier {
      * @param barrierPath barrier path
      */
     public void unregister(final String barrierPath) {
-        getRepository().delete(String.join("/", barrierPath));
+        getRepository().delete(barrierPath);
         InnerCountDownLatchHolder holder = countDownLatchHolders.remove(barrierPath);
         if (null != holder) {
             holder.getCountDownLatch().countDown();
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
new file mode 100644
index 00000000000..97979f82b7f
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener