You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/11/01 01:50:21 UTC
[shardingsphere] branch master updated: Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)
This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb74c3e8b6 Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)
6bb74c3e8b6 is described below
commit 6bb74c3e8b667ee92805706c4f82f01561e2c028
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 1 09:50:14 2022 +0800
Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)
* Add missed COUNT(1) in buildSplitByPrimaryKeyRangeSQL of Oracle
* Extract stop() to AbstractPipelineJob
* Extract prepare() to AbstractPipelineJob
* Extract execute() to AbstractSimplePipelineJob
* Extract shardingItem duplicated check to AbstractPipelineJob; Remove runInBackground
* Clean Getter Setter
---
.../pipeline/core/job/AbstractPipelineJob.java | 80 ++++++++++++++++++----
.../core/job/AbstractSimplePipelineJob.java | 56 +++++++++++++++
.../core/sqlbuilder/OraclePipelineSQLBuilder.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 53 ++++----------
.../pipeline/scenario/migration/MigrationJob.java | 77 +++++----------------
5 files changed, 154 insertions(+), 114 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 10d59e68431..03ca0efd927 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -20,8 +20,13 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -36,27 +41,53 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Abstract pipeline job.
*/
-@Getter
+@Slf4j
public abstract class AbstractPipelineJob implements PipelineJob {
- @Setter
+ @Getter
private volatile String jobId;
- @Setter
+ @Getter(value = AccessLevel.PROTECTED)
+ private volatile PipelineJobAPI jobAPI;
+
+ @Getter
private volatile boolean stopping;
@Setter
private volatile JobBootstrap jobBootstrap;
- @Getter(value = AccessLevel.PRIVATE)
private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
- protected void runInBackground(final Runnable runnable) {
- new Thread(runnable).start();
+ protected void setJobId(final String jobId) {
+ this.jobId = jobId;
+ jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
+ }
+
+ protected void prepare(final PipelineJobItemContext jobItemContext) {
+ try {
+ long startTimeMillis = System.currentTimeMillis();
+ doPrepare(jobItemContext);
+ log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ String jobId = jobItemContext.getJobId();
+ log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
+ jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
+ jobAPI.persistJobItemProgress(jobItemContext);
+ jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
+ jobAPI.stop(jobId);
+ if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ }
+ throw new RuntimeException(ex);
+ }
}
+ protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws Exception;
+
@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
@@ -67,22 +98,41 @@ public abstract class AbstractPipelineJob implements PipelineJob {
return new ArrayList<>(tasksRunnerMap.keySet());
}
- protected void addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
- tasksRunnerMap.put(shardingItem, tasksRunner);
+ protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
+ if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
+ log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
+ return false;
+ }
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+ return true;
}
- protected boolean containsTasksRunner(final int shardingItem) {
- return tasksRunnerMap.containsKey(shardingItem);
+ @Override
+ public void stop() {
+ try {
+ innerStop();
+ } finally {
+ innerClean();
+ doClean();
+ }
}
- protected void clearTasksRunner() {
- tasksRunnerMap.clear();
- PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+ private void innerStop() {
+ stopping = true;
+ if (null != jobBootstrap) {
+ jobBootstrap.shutdown();
+ }
+ log.info("stop tasks runner, jobId={}", getJobId());
+ for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+ each.stop();
+ }
}
- protected Collection<PipelineTasksRunner> getTasksRunners() {
- return tasksRunnerMap.values();
+ private void innerClean() {
+ tasksRunnerMap.clear();
+ PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
}
+
+ protected abstract void doClean();
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
new file mode 100644
index 00000000000..9e26b190c43
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+/**
+ * Abstract simple pipeline job.
+ */
+@Slf4j
+public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob implements SimpleJob {
+
+ protected abstract PipelineJobItemContext buildPipelineJobItemContext(ShardingContext shardingContext);
+
+ protected abstract PipelineTasksRunner buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+
+ @Override
+ public void execute(final ShardingContext shardingContext) {
+ String jobId = shardingContext.getJobName();
+ int shardingItem = shardingContext.getShardingItem();
+ log.info("Execute job {}-{}", jobId, shardingItem);
+ if (isStopping()) {
+ log.info("stopping true, ignore");
+ return;
+ }
+ setJobId(jobId);
+ PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
+ PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
+ if (!addTasksRunner(shardingItem, tasksRunner)) {
+ return;
+ }
+ getJobAPI().cleanJobItemErrorMessage(jobId, jobItemContext.getShardingItem());
+ prepare(jobItemContext);
+ log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
+ tasksRunner.start();
+ }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
index 14939768a4e..67a4e4716eb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
@@ -69,7 +69,7 @@ public final class OraclePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String primaryKey) {
String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey = quote(primaryKey);
- return String.format("SELECT MAX(%s) FROM (SELECT * FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s) WHERE ROWNUM<=?) t",
+ return String.format("SELECT MAX(%s), COUNT(1) FROM (SELECT * FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s) WHERE ROWNUM<=?) t",
quotedUniqueKey, quotedUniqueKey, qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 19b866f8641..ac8f1f43580 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -19,58 +19,35 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
/**
* Consistency check job.
*/
@Slf4j
-public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
-
- private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
@Override
- public void execute(final ShardingContext shardingContext) {
- String checkJobId = shardingContext.getJobName();
- int shardingItem = shardingContext.getShardingItem();
- log.info("Execute job {}-{}", checkJobId, shardingItem);
- if (isStopping()) {
- log.info("stopping true, ignore");
- return;
- }
- setJobId(checkJobId);
+ protected ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
- if (containsTasksRunner(shardingItem)) {
- log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
- return;
- }
- log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
- jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
- tasksRunner.start();
- addTasksRunner(shardingItem, tasksRunner);
+ return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING);
+ }
+
+ @Override
+ protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
+ return new ConsistencyCheckTasksRunner((ConsistencyCheckJobItemContext) pipelineJobItemContext);
+ }
+
+ @Override
+ protected void doPrepare(final PipelineJobItemContext jobItemContext) {
}
@Override
- public void stop() {
- setStopping(true);
- if (null != getJobBootstrap()) {
- getJobBootstrap().shutdown();
- }
- if (null == getJobId()) {
- log.info("stop consistency check job, jobId is null, ignore");
- return;
- }
- for (PipelineTasksRunner each : getTasksRunners()) {
- each.stop();
- }
- clearTasksRunner();
+ protected void doClean() {
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 975e72d1f6b..cdf071c6226 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,16 +20,16 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.sql.SQLException;
@@ -38,7 +38,7 @@ import java.sql.SQLException;
*/
@RequiredArgsConstructor
@Slf4j
-public final class MigrationJob extends AbstractPipelineJob implements SimpleJob {
+public final class MigrationJob extends AbstractSimplePipelineJob {
private final MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
@@ -48,71 +48,28 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();
@Override
- public void execute(final ShardingContext shardingContext) {
+ protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
- log.info("Execute job {}-{}", shardingContext.getJobName(), shardingItem);
- if (isStopping()) {
- log.info("stopping true, ignore");
- return;
- }
- setJobId(shardingContext.getJobName());
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
- MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
- if (containsTasksRunner(shardingItem)) {
- log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
- return;
- }
- log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
- jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
- InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
- runInBackground(() -> {
- prepare(jobItemContext);
- tasksRunner.start();
- });
- addTasksRunner(shardingItem, tasksRunner);
+ return new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
}
- private void prepare(final MigrationJobItemContext jobItemContext) {
- try {
- long startTimeMillis = System.currentTimeMillis();
- jobPreparer.prepare(jobItemContext);
- log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
- // CHECKSTYLE:OFF
- } catch (final SQLException | RuntimeException ex) {
- // CHECKSTYLE:ON
- log.error("job prepare failed, {}-{}", getJobId(), jobItemContext.getShardingItem(), ex);
- jobAPI.stop(jobItemContext.getJobId());
- jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
- jobAPI.persistJobItemProgress(jobItemContext);
- jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
- if (ex instanceof RuntimeException) {
- throw (RuntimeException) ex;
- }
- throw new RuntimeException(ex);
- }
+ @Override
+ protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
+ InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
+ return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
- /**
- * Stop job.
- */
- public void stop() {
- setStopping(true);
+ @Override
+ protected void doPrepare(final PipelineJobItemContext jobItemContext) throws SQLException {
+ jobPreparer.prepare((MigrationJobItemContext) jobItemContext);
+ }
+
+ @Override
+ public void doClean() {
dataSourceManager.close();
- if (null != getJobBootstrap()) {
- getJobBootstrap().shutdown();
- }
- String jobId = getJobId();
- if (null == jobId) {
- log.info("stop, jobId is null, ignore");
- return;
- }
- log.info("stop tasks runner, jobId={}", jobId);
- for (PipelineTasksRunner each : getTasksRunners()) {
- each.stop();
- }
- clearTasksRunner();
}
}