You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/23 01:05:02 UTC
[shardingsphere] branch master updated: Review and improve pipeline code (#21139)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 98650b386b8 Review and improve pipeline code (#21139)
98650b386b8 is described below
commit 98650b386b851dffe87154e43d5440cebda4191a
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 23 09:04:54 2022 +0800
Review and improve pipeline code (#21139)
* Review and improve pipeline code
* Rename getJobRootClassName
---
.../shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java | 2 ++
.../data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java | 6 ++++--
.../data/pipeline/core/prepare/InventoryTaskSplitter.java | 9 +++++++++
.../migration/MigrationChangedJobConfigurationProcessor.java | 6 ++++--
.../data/pipeline/scenario/migration/MigrationJobAPIImpl.java | 5 +++++
5 files changed, 24 insertions(+), 4 deletions(-)
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
index 0af0a6260ee..8aed52442b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
@@ -38,4 +38,6 @@ public abstract class PipelineJobInfo {
private String createTime;
private String stopTime;
+
+ private transient String jobParameter;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 235ff348eac..08d41cb5d66 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -91,6 +90,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
jobInfo.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
jobInfo.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
jobInfo.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+ jobInfo.setJobParameter(jobConfigPOJO.getJobParameter());
}
@Override
@@ -104,11 +104,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), MigrationJob.class.getName());
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getJobClassName());
repositoryAPI.persist(jobConfigKey, convertJobConfigurationToText(jobConfig));
return Optional.of(jobId);
}
+ protected abstract String getJobClassName();
+
private String convertJobConfigurationToText(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
jobConfigPOJO.setJobName(jobConfig.getJobId());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 2e72e3e2451..c6030f6cc04 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -33,11 +33,13 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -115,6 +117,13 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
final DataSource dataSource) {
+ if (null == dumperConfig.getUniqueKey()) {
+ String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+ String actualTableName = dumperConfig.getActualTableName();
+ PipelineColumnMetaData uniqueKeyColumn = PipelineTableMetaDataUtil.getUniqueKeyColumn(schemaName, actualTableName, metaDataLoader);
+ dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+ dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
+ }
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index b54a5a493c8..d1922948cf6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -52,7 +52,9 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
log.info("{} executing jobs", jobId);
- CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
+ CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+ log.error("execute failed, jobId={}", jobId, throwable);
+ });
}
break;
case DELETED:
@@ -70,8 +72,8 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
MigrationJob job = new MigrationJob();
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
- oneOffJobBootstrap.execute();
job.setOneOffJobBootstrap(oneOffJobBootstrap);
+ oneOffJobBootstrap.execute();
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 27ef7c8f539..b4419dafa05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -183,6 +183,11 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
return marshalJobId(jobId);
}
+ @Override
+ protected String getJobClassName() {
+ return MigrationJob.class.getName();
+ }
+
@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);