You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/28 02:48:02 UTC
[shardingsphere] branch master updated: Add pipeline elastic job listener (#24807)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d865f39e9d9 Add pipeline elastic job listener (#24807)
d865f39e9d9 is described below
commit d865f39e9d9e73865b097c009962a6a6afb456d8
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Mar 28 10:47:54 2023 +0800
Add pipeline elastic job listener (#24807)
* Add pipeline job listener
* Adjust persist barrier position and sleep time
* Improve class name
* stop consistency check job before drop
* Fix ci timeout
* Fix stop DistSQL blocked when job already stopped
---
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 6 ++
.../pipeline/core/job/AbstractPipelineJob.java | 29 +++++++++-
.../PipelineContextManagerLifecycleListener.java | 3 +
.../core/listener/PipelineElasticJobListener.java | 64 ++++++++++++++++++++++
.../AbstractChangedJobConfigurationProcessor.java | 13 +++--
.../core/util/PipelineDistributedBarrier.java | 2 +-
...re.elasticjob.infra.listener.ElasticJobListener | 18 ++++++
7 files changed, 126 insertions(+), 9 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 919e3fc09ba..a158ba9ea1a 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -34,6 +34,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHas
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.UnsupportedModeTypeException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -44,6 +45,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -115,6 +117,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", System.currentTimeMillis() + "");
+ result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}
@@ -143,6 +146,9 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance();
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
+ if (jobConfigPOJO.isDisabled()) {
+ return;
+ }
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER));
jobConfigPOJO.getProps().setProperty("stop_time_millis", System.currentTimeMillis() + "");
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 0e636e73067..5fcf64b5522 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -26,8 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -36,6 +39,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
/**
* Abstract pipeline job.
@@ -113,13 +117,32 @@ public abstract class AbstractPipelineJob implements PipelineJob {
private void innerStop() {
stopping = true;
- if (null != jobBootstrap) {
- jobBootstrap.shutdown();
- }
log.info("stop tasks runner, jobId={}", jobId);
for (PipelineTasksRunner each : tasksRunnerMap.values()) {
each.stop();
}
+ Optional<ElasticJobListener> pipelineJobListener = ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, PipelineElasticJobListener.class.getName());
+ pipelineJobListener.ifPresent(jobListener -> awaitJobStopped((PipelineElasticJobListener) jobListener, jobId, TimeUnit.SECONDS.toMillis(2)));
+ if (null == jobBootstrap) {
+ return;
+ }
+ jobBootstrap.shutdown();
+ }
+
+ private void awaitJobStopped(final PipelineElasticJobListener jobListener, final String jobId, final long timeoutMillis) {
+ int time = 0;
+ int sleepTime = 50;
+ while (time < timeoutMillis) {
+ if (!jobListener.isJobRunning(jobId)) {
+ break;
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (final InterruptedException ignored) {
+ break;
+ }
+ time += sleepTime;
+ }
}
private void innerClean() {
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 093401d2779..864c16f0aa8 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobWorker;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -42,5 +44,6 @@ public final class PipelineContextManagerLifecycleListener implements ContextMan
PipelineContext.initModeConfig(modeConfig);
PipelineContext.initContextManager(contextManager);
PipelineJobWorker.initialize();
+ ElasticJobServiceLoader.registerTypedService(ElasticJobListener.class);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
new file mode 100644
index 00000000000..7861b576f78
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineElasticJobListener.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
+import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Pipeline elastic job listener.
+ */
+@Slf4j
+public final class PipelineElasticJobListener implements ElasticJobListener {
+
+ // TODO ElasticJobListenerFactory.createListener return new class instance, it's the reason why static variables
+ private static final Map<String, Long> RUNNING_JOBS = new ConcurrentHashMap<>();
+
+ @Override
+ public void beforeJobExecuted(final ShardingContexts shardingContexts) {
+ if (RUNNING_JOBS.containsKey(shardingContexts.getJobName())) {
+ log.warn("{} already exists", shardingContexts.getJobName());
+ }
+ RUNNING_JOBS.put(shardingContexts.getJobName(), System.currentTimeMillis());
+ }
+
+ @Override
+ public void afterJobExecuted(final ShardingContexts shardingContexts) {
+ log.info("After {} job execute ", shardingContexts.getJobName());
+ RUNNING_JOBS.remove(shardingContexts.getJobName());
+ }
+
+ /**
+ * Is job running.
+ *
+ * @param jobId job id
+ * @return true if job is running otherwise false
+ */
+ public boolean isJobRunning(final String jobId) {
+ return RUNNING_JOBS.containsKey(jobId);
+ }
+
+ @Override
+ public String getType() {
+ return PipelineElasticJobListener.class.getName();
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index 9abebfbfdde..3c0a749f191 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -29,6 +29,8 @@ import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import java.util.Collection;
+
/**
* Abstract changed job configuration processor.
*/
@@ -38,16 +40,17 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
@Override
public void process(final Type eventType, final JobConfiguration jobConfig) {
boolean disabled = jobConfig.isDisabled();
- if (disabled) {
- onDisabled(jobConfig);
- }
boolean deleted = Type.DELETED == eventType;
if (deleted) {
onDeleted(jobConfig);
}
String jobId = jobConfig.getJobName();
if (disabled || deleted) {
+ Collection<Integer> jobItems = PipelineJobCenter.getShardingItems(jobId);
PipelineJobCenter.stop(jobId);
+ if (disabled) {
+ onDisabled(jobConfig, jobItems);
+ }
return;
}
switch (eventType) {
@@ -64,9 +67,9 @@ public abstract class AbstractChangedJobConfigurationProcessor implements Change
}
}
- protected void onDisabled(final JobConfiguration jobConfig) {
+ protected void onDisabled(final JobConfiguration jobConfig, final Collection<Integer> jobItems) {
String jobId = jobConfig.getJobName();
- for (Integer each : PipelineJobCenter.getShardingItems(jobId)) {
+ for (Integer each : jobItems) {
PipelineDistributedBarrier.getInstance().persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 6d3e050d903..90aca859bb9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -99,7 +99,7 @@ public final class PipelineDistributedBarrier {
* @param barrierPath barrier path
*/
public void unregister(final String barrierPath) {
- getRepository().delete(String.join("/", barrierPath));
+ getRepository().delete(barrierPath);
InnerCountDownLatchHolder holder = countDownLatchHolders.remove(barrierPath);
if (null != holder) {
holder.getCountDownLatch().countDown();
diff --git a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
new file mode 100644
index 00000000000..97979f82b7f
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener