You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/06/20 08:21:16 UTC
[shardingsphere] branch master updated: Fix scaling job prepare failed when frequent restarting job (#18444)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 93494cb6a9f Fix scaling job prepare failed when frequent restarting job (#18444)
93494cb6a9f is described below
commit 93494cb6a9fbaf92a1a65f1d2decaf6b125ca07a
Author: azexcy <10...@users.noreply.github.com>
AuthorDate: Mon Jun 20 16:21:08 2022 +0800
Fix scaling job prepare failed when frequent restarting job (#18444)
---
.../core/exception/PipelineIgnoredException.java | 35 ++++++++++++++++++++++
.../rulealtered/RuleAlteredJobPreparer.java | 5 ++--
.../rulealtered/RuleAlteredJobScheduler.java | 5 ++++
3 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java
new file mode 100644
index 00000000000..3788c15a6c2
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineIgnoredException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.exception;
+
+/**
+ * Pipeline job ignored exception.
+ */
+public final class PipelineIgnoredException extends RuntimeException {
+
+ private static final long serialVersionUID = -606723977923290937L;
+
+ public PipelineIgnoredException(final String message) {
+ super(message);
+ }
+
+ public PipelineIgnoredException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
+
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 54e36d99597..4408e275d14 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
@@ -77,11 +78,11 @@ public final class RuleAlteredJobPreparer {
public void prepare(final RuleAlteredJobContext jobContext) {
checkSourceDataSource(jobContext);
if (jobContext.isStopping()) {
- throw new PipelineJobPrepareFailedException("Job stopping, jobId=" + jobContext.getJobId());
+ throw new PipelineIgnoredException("Job stopping, jobId=" + jobContext.getJobId());
}
prepareAndCheckTargetWithLock(jobContext);
if (jobContext.isStopping()) {
- throw new PipelineJobPrepareFailedException("Job stopping, jobId=" + jobContext.getJobId());
+ throw new PipelineIgnoredException("Job stopping, jobId=" + jobContext.getJobId());
}
// TODO check metadata
try {
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index 3ef2211ea35..23bc6d15159 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosit
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -72,6 +73,10 @@ public final class RuleAlteredJobScheduler implements Runnable {
GovernanceRepositoryAPI governanceRepositoryAPI = PipelineAPIFactory.getGovernanceRepositoryAPI();
try {
jobContext.getJobPreparer().prepare(jobContext);
+ } catch (final PipelineIgnoredException ex) {
+ log.info("pipeline ignore exception: {}", ex.getMessage());
+ RuleAlteredJobCenter.stop(jobId);
+ ShardingSphereEventBus.getInstance().post(new ScalingReleaseDatabaseLevelLockEvent(jobContext.getJobConfig().getDatabaseName()));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON