You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/20 08:21:16 UTC

[shardingsphere] branch master updated: Fix scaling job prepare failed when frequent restarting job (#18444)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 93494cb6a9f Fix scaling job prepare failed when frequent restarting job (#18444)
93494cb6a9f is described below

commit 93494cb6a9fbaf92a1a65f1d2decaf6b125ca07a
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Mon Jun 20 16:21:08 2022 +0800

    Fix scaling job prepare failed when frequent restarting job (#18444)
---
 .../core/exception/PipelineIgnoredException.java   | 35 ++++++++++++++++++++++
 .../rulealtered/RuleAlteredJobPreparer.java        |  5 ++--
 .../rulealtered/RuleAlteredJobScheduler.java       |  5 ++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java
new file mode 100644
index 00000000000..3788c15a6c2
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+/**
+ * Pipeline job ignored exception.
+ */
+public final class PipelineIgnoredException extends RuntimeException {
+    
+    private static final long serialVersionUID = -606723977923290937L;
+    
+    public PipelineIgnoredException(final String message) {
+        super(message);
+    }
+    
+    public PipelineIgnoredException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
+
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 54e36d99597..4408e275d14 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
@@ -77,11 +78,11 @@ public final class RuleAlteredJobPreparer {
     public void prepare(final RuleAlteredJobContext jobContext) {
         checkSourceDataSource(jobContext);
         if (jobContext.isStopping()) {
-            throw new PipelineJobPrepareFailedException("Job stopping, jobId=" + jobContext.getJobId());
+            throw new PipelineIgnoredException("Job stopping, jobId=" + jobContext.getJobId());
         }
         prepareAndCheckTargetWithLock(jobContext);
         if (jobContext.isStopping()) {
-            throw new PipelineJobPrepareFailedException("Job stopping, jobId=" + jobContext.getJobId());
+            throw new PipelineIgnoredException("Job stopping, jobId=" + jobContext.getJobId());
         }
         // TODO check metadata
         try {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 3ef2211ea35..23bc6d15159 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosit
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -72,6 +73,10 @@ public final class RuleAlteredJobScheduler implements Runnable {
         GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
         try {
             jobContext.getJobPreparer().prepare(jobContext);
+        } catch (final PipelineIgnoredException ex) {
+            log.info("pipeline ignore exception: {}", ex.getMessage());
+            RuleAlteredJobCenter.stop(jobId);
+            ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName()));
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON