You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/06 12:16:49 UTC
[shardingsphere] branch master updated: Cancel database level exclusive lock in scaling job (#18903)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 847d2977923 Cancel database level exclusive lock in scaling job (#18903)
847d2977923 is described below
commit 847d29779238e627ef3be5ce45fdbfa4a2f83e8a
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Jul 6 20:16:33 2022 +0800
Cancel database level exclusive lock in scaling job (#18903)
---
.../data/pipeline/api/job/JobStatus.java | 1 +
.../pipeline/core/execute/PipelineJobExecutor.java | 24 +++++----------
.../scenario/rulealtered/RuleAlteredJobCenter.java | 10 +++++++
.../rulealtered/RuleAlteredJobScheduler.java | 14 ++-------
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 9 ------
.../scenario/rulealtered/RuleAlteredJobWorker.java | 35 ----------------------
.../rule/ScalingReleaseDatabaseLevelLockEvent.java | 31 -------------------
7 files changed, 21 insertions(+), 103 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index da423a47613..7741b8385f7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -47,6 +47,7 @@ public enum JobStatus {
*/
EXECUTE_INCREMENTAL_TASK(true),
+ // TODO rename to SUCCESS
/**
* Job is finished.
*/
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index ead132e38ce..d3d2c33623d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -24,17 +24,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleA
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import java.util.Optional;
@@ -75,8 +71,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
boolean isDisabled = jobConfigPOJO.isDisabled();
if (isDeleted || isDisabled) {
- log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), isDeleted, isDisabled);
- RuleAlteredJobCenter.stop(jobConfigPOJO.getJobName());
+ String jobId = jobConfigPOJO.getJobName();
+ log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted, isDisabled);
// TODO refactor: dispatch to different job types
RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
if (isDeleted) {
@@ -85,13 +81,13 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
log.info("isJobSuccessful=true");
new RuleAlteredJobPreparer().cleanup(jobConfig);
}
- ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName()));
+ RuleAlteredJobCenter.stop(jobId);
return;
}
switch (event.getType()) {
case ADDED:
case UPDATED:
- if (RuleAlteredJobSchedulerCenter.existJob(jobConfigPOJO.getJobName())) {
+ if (RuleAlteredJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
} else {
executor.execute(() -> execute(jobConfigPOJO));
@@ -103,15 +99,9 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
}
private void execute(final JobConfigurationPOJO jobConfigPOJO) {
- String databaseName = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()).getDatabaseName();
- if (PipelineSimpleLock.getInstance().tryLock(databaseName, 3000)) {
- log.info("{} added to executing jobs success", jobConfigPOJO.getJobName());
- RuleAlteredJob job = new RuleAlteredJob();
- RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
- new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration()).execute();
- } else {
- log.info("tryLock failed, databaseName={}", databaseName);
- }
+ RuleAlteredJob job = new RuleAlteredJob();
+ RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration()).execute();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
index 478089aa45b..dfbdcedaafd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
@@ -40,6 +40,16 @@ public final class RuleAlteredJobCenter {
JOB_MAP.put(jobId, job);
}
+ /**
+ * Is job existing.
+ *
+ * @param jobId job id
+ * @return true when job exists, else false
+ */
+ public static boolean isJobExisting(final String jobId) {
+ return JOB_MAP.containsKey(jobId);
+ }
+
/**
* Stop job.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 23bc6d15159..de7f6fcac61 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -28,8 +28,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
/**
* Rule altered job scheduler.
@@ -55,6 +53,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
public void stop() {
jobContext.setStopping(true);
log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), jobContext.getShardingItem());
+ // TODO blocking stop
for (InventoryTask each : jobContext.getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
each.stop();
@@ -76,7 +75,6 @@ public final class RuleAlteredJobScheduler implements Runnable {
} catch (final PipelineIgnoredException ex) {
log.info("pipeline ignore exception: {}", ex.getMessage());
RuleAlteredJobCenter.stop(jobId);
- ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName()));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -84,8 +82,6 @@ public final class RuleAlteredJobScheduler implements Runnable {
RuleAlteredJobCenter.stop(jobId);
jobContext.setStatus(JobStatus.PREPARING_FAILURE);
governanceRepositoryAPI.persistJobProgress(jobContext);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
- ShardingSphereEventBus.getInstance().post(event);
throw ex;
}
if (jobContext.isStopping()) {
@@ -133,10 +129,8 @@ public final class RuleAlteredJobScheduler implements Runnable {
@Override
public void onFailure(final Throwable throwable) {
log.error("Inventory task execute failed.", throwable);
- stop();
jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
- ShardingSphereEventBus.getInstance().post(event);
+ stop();
}
};
}
@@ -167,10 +161,8 @@ public final class RuleAlteredJobScheduler implements Runnable {
@Override
public void onFailure(final Throwable throwable) {
log.error("Incremental task execute failed.", throwable);
- stop();
jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
- ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
- ShardingSphereEventBus.getInstance().post(event);
+ stop();
}
};
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 24bf821128f..6b0f22b27b6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -85,15 +85,6 @@ public final class RuleAlteredJobSchedulerCenter {
JOB_SCHEDULER_MAP.remove(jobId);
}
- /**
- * Check whether the same job exists.
- * @param jobId job id
- * @return exist then true else false
- */
- public static boolean existJob(final String jobId) {
- return JOB_SCHEDULER_MAP.containsKey(jobId);
- }
-
/**
* Update job status for all job sharding.
*
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 094a0d07f05..46227470e7d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -32,27 +32,21 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
-import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.LockContext;
-import org.apache.shardingsphere.infra.lock.LockNameDefinition;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.mode.manager.lock.definition.LockNameDefinitionFactory;
import java.util.Collection;
import java.util.HashMap;
@@ -174,7 +168,6 @@ public final class RuleAlteredJobWorker {
log.info("Switch rule configuration immediately.");
ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
- ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(event.getDatabaseName()));
}
}
@@ -276,32 +269,4 @@ public final class RuleAlteredJobWorker {
}
return result;
}
-
- /**
- * scaling release database level lock.
- *
- * @param event scaling release database level lock event
- */
- @Subscribe
- public void scalingReleaseDatabaseLevelLock(final ScalingReleaseDatabaseLevelLockEvent event) {
- String databaseName = event.getDatabaseName();
- try {
- restoreSourceWriting(databaseName);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("restore source writing failed, databaseName={}", databaseName, ex);
- }
- PipelineSimpleLock.getInstance().releaseLock(event.getDatabaseName());
- }
-
- private void restoreSourceWriting(final String databaseName) {
- log.info("restoreSourceWriting, databaseName={}", databaseName);
- LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
- LockNameDefinition lockNameDefinition = LockNameDefinitionFactory.newDatabaseDefinition(databaseName);
- if (lockContext.isLocked(lockNameDefinition)) {
- log.info("Source writing is still stopped on database '{}', restore it now", databaseName);
- lockContext.releaseLock(lockNameDefinition);
- }
- }
}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLevelLockEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLe [...]
deleted file mode 100644
index 08232f14441..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLevelLockEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Scaling release database level lock event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ScalingReleaseDatabaseLevelLockEvent {
-
- private final String databaseName;
-}