You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/07/06 12:16:49 UTC

[shardingsphere] branch master updated: Cancel database level exclusive lock in scaling job (#18903)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 847d2977923 Cancel database level exclusive lock in scaling job (#18903)
847d2977923 is described below

commit 847d29779238e627ef3be5ce45fdbfa4a2f83e8a
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Wed Jul 6 20:16:33 2022 +0800

    Cancel database level exclusive lock in scaling job (#18903)
---
 .../data/pipeline/api/job/JobStatus.java           |  1 +
 .../pipeline/core/execute/PipelineJobExecutor.java | 24 +++++----------
 .../scenario/rulealtered/RuleAlteredJobCenter.java | 10 +++++++
 .../rulealtered/RuleAlteredJobScheduler.java       | 14 ++-------
 .../rulealtered/RuleAlteredJobSchedulerCenter.java |  9 ------
 .../scenario/rulealtered/RuleAlteredJobWorker.java | 35 ----------------------
 .../rule/ScalingReleaseDatabaseLevelLockEvent.java | 31 -------------------
 7 files changed, 21 insertions(+), 103 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index da423a47613..7741b8385f7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -47,6 +47,7 @@ public enum JobStatus {
      */
     EXECUTE_INCREMENTAL_TASK(true),
     
+    // TODO rename to SUCCESS
     /**
      * Job is finished.
      */
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index ead132e38ce..d3d2c33623d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -24,17 +24,13 @@ import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleA
 import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobProgressDetector;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
 
 import java.util.Optional;
@@ -75,8 +71,8 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
         boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
         boolean isDisabled = jobConfigPOJO.isDisabled();
         if (isDeleted || isDisabled) {
-            log.info("jobId={}, deleted={}, disabled={}", jobConfigPOJO.getJobName(), isDeleted, isDisabled);
-            RuleAlteredJobCenter.stop(jobConfigPOJO.getJobName());
+            String jobId = jobConfigPOJO.getJobName();
+            log.info("jobId={}, deleted={}, disabled={}", jobId, isDeleted, isDisabled);
             // TODO refactor: dispatch to different job types
             RuleAlteredJobConfiguration jobConfig = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
             if (isDeleted) {
@@ -85,13 +81,13 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
                 log.info("isJobSuccessful=true");
                 new RuleAlteredJobPreparer().cleanup(jobConfig);
             }
-            ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobConfig.getDatabaseName()));
+            RuleAlteredJobCenter.stop(jobId);
             return;
         }
         switch (event.getType()) {
             case ADDED:
             case UPDATED:
-                if (RuleAlteredJobSchedulerCenter.existJob(jobConfigPOJO.getJobName())) {
+                if (RuleAlteredJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
                     log.info("{} added to executing jobs failed since it already exists", jobConfigPOJO.getJobName());
                 } else {
                     executor.execute(() -> execute(jobConfigPOJO));
@@ -103,15 +99,9 @@ public final class PipelineJobExecutor extends AbstractLifecycleExecutor {
     }
     
     private void execute(final JobConfigurationPOJO jobConfigPOJO) {
-        String databaseName = RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter()).getDatabaseName();
-        if (PipelineSimpleLock.getInstance().tryLock(databaseName, 3000)) {
-            log.info("{} added to executing jobs success", jobConfigPOJO.getJobName());
-            RuleAlteredJob job = new RuleAlteredJob();
-            RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
-            new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration()).execute();
-        } else {
-            log.info("tryLock failed, databaseName={}", databaseName);
-        }
+        RuleAlteredJob job = new RuleAlteredJob();
+        RuleAlteredJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration()).execute();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
index 478089aa45b..dfbdcedaafd 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobCenter.java
@@ -40,6 +40,16 @@ public final class RuleAlteredJobCenter {
         JOB_MAP.put(jobId, job);
     }
     
+    /**
+     * Is job existing.
+     *
+     * @param jobId job id
+     * @return true when job exists, else false
+     */
+    public static boolean isJobExisting(final String jobId) {
+        return JOB_MAP.containsKey(jobId);
+    }
+    
     /**
      * Stop job.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 23bc6d15159..de7f6fcac61 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -28,8 +28,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 
 /**
  * Rule altered job scheduler.
@@ -55,6 +53,7 @@ public final class RuleAlteredJobScheduler implements Runnable {
     public void stop() {
         jobContext.setStopping(true);
         log.info("stop, jobId={}, shardingItem={}", jobContext.getJobId(), jobContext.getShardingItem());
+        // TODO blocking stop
         for (InventoryTask each : jobContext.getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), each.getTaskId());
             each.stop();
@@ -76,7 +75,6 @@ public final class RuleAlteredJobScheduler implements Runnable {
         } catch (final PipelineIgnoredException ex) {
             log.info("pipeline ignore exception: {}", ex.getMessage());
             RuleAlteredJobCenter.stop(jobId);
-            ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName()));
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -84,8 +82,6 @@ public final class RuleAlteredJobScheduler implements Runnable {
             RuleAlteredJobCenter.stop(jobId);
             jobContext.setStatus(JobStatus.PREPARING_FAILURE);
             governanceRepositoryAPI.persistJobProgress(jobContext);
-            ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
-            ShardingSphereEventBus.getInstance().post(event);
             throw ex;
         }
         if (jobContext.isStopping()) {
@@ -133,10 +129,8 @@ public final class RuleAlteredJobScheduler implements Runnable {
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Inventory task execute failed.", throwable);
-                stop();
                 jobContext.setStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
-                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
-                ShardingSphereEventBus.getInstance().post(event);
+                stop();
             }
         };
     }
@@ -167,10 +161,8 @@ public final class RuleAlteredJobScheduler implements Runnable {
             @Override
             public void onFailure(final Throwable throwable) {
                 log.error("Incremental task execute failed.", throwable);
-                stop();
                 jobContext.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
-                ScalingReleaseDatabaseLevelLockEvent event = new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName());
-                ShardingSphereEventBus.getInstance().post(event);
+                stop();
             }
         };
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 24bf821128f..6b0f22b27b6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -85,15 +85,6 @@ public final class RuleAlteredJobSchedulerCenter {
         JOB_SCHEDULER_MAP.remove(jobId);
     }
     
-    /**
-     * Check whether the same job exists.
-     * @param jobId job id
-     * @return exist then true else false
-     */
-    public static boolean existJob(final String jobId) {
-        return JOB_SCHEDULER_MAP.containsKey(jobId);
-    }
-    
     /**
      * Update job status for all job sharding.
      *
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 094a0d07f05..46227470e7d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -32,27 +32,21 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Shardi
 import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
 import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
-import org.apache.shardingsphere.data.pipeline.core.lock.PipelineSimpleLock;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
 import org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.config.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.LockContext;
-import org.apache.shardingsphere.infra.lock.LockNameDefinition;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRuleConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
 import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.cache.event.StartScalingEvent;
-import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingReleaseDatabaseLevelLockEvent;
 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.ScalingTaskFinishedEvent;
-import org.apache.shardingsphere.mode.manager.lock.definition.LockNameDefinitionFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -174,7 +168,6 @@ public final class RuleAlteredJobWorker {
             log.info("Switch rule configuration immediately.");
             ScalingTaskFinishedEvent taskFinishedEvent = new ScalingTaskFinishedEvent(event.getDatabaseName(), event.getActiveVersion(), event.getNewVersion());
             ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
-            ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(event.getDatabaseName()));
         }
     }
     
@@ -276,32 +269,4 @@ public final class RuleAlteredJobWorker {
         }
         return result;
     }
-    
-    /**
-     * scaling release database level lock.
-     *
-     * @param event scaling release database level lock event
-     */
-    @Subscribe
-    public void scalingReleaseDatabaseLevelLock(final ScalingReleaseDatabaseLevelLockEvent event) {
-        String databaseName = event.getDatabaseName();
-        try {
-            restoreSourceWriting(databaseName);
-            // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("restore source writing failed, databaseName={}", databaseName, ex);
-        }
-        PipelineSimpleLock.getInstance().releaseLock(event.getDatabaseName());
-    }
-    
-    private void restoreSourceWriting(final String databaseName) {
-        log.info("restoreSourceWriting, databaseName={}", databaseName);
-        LockContext lockContext = PipelineContext.getContextManager().getInstanceContext().getLockContext();
-        LockNameDefinition lockNameDefinition = LockNameDefinitionFactory.newDatabaseDefinition(databaseName);
-        if (lockContext.isLocked(lockNameDefinition)) {
-            log.info("Source writing is still stopped on database '{}', restore it now", databaseName);
-            lockContext.releaseLock(lockNameDefinition);
-        }
-    }
 }
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLevelLockEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLe [...]
deleted file mode 100644
index 08232f14441..00000000000
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/event/rule/ScalingReleaseDatabaseLevelLockEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Scaling release database level lock event.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ScalingReleaseDatabaseLevelLockEvent {
-    
-    private final String databaseName;
-}