You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/09/23 01:05:02 UTC

[shardingsphere] branch master updated: Review and improve pipeline code (#21139)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 98650b386b8 Review and improve pipeline code (#21139)
98650b386b8 is described below

commit 98650b386b851dffe87154e43d5440cebda4191a
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Sep 23 09:04:54 2022 +0800

    Review and improve pipeline code (#21139)
    
    * Review and improve pipeline code
    
    * Rename getJobRootClassName
---
 .../shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java   | 2 ++
 .../data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java  | 6 ++++--
 .../data/pipeline/core/prepare/InventoryTaskSplitter.java        | 9 +++++++++
 .../migration/MigrationChangedJobConfigurationProcessor.java     | 6 ++++--
 .../data/pipeline/scenario/migration/MigrationJobAPIImpl.java    | 5 +++++
 5 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
index 0af0a6260ee..8aed52442b2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
@@ -38,4 +38,6 @@ public abstract class PipelineJobInfo {
     private String createTime;
     
     private String stopTime;
+    
+    private transient String jobParameter;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 235ff348eac..08d41cb5d66 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -35,7 +35,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -91,6 +90,7 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
         jobInfo.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
         jobInfo.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
         jobInfo.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+        jobInfo.setJobParameter(jobConfigPOJO.getJobParameter());
     }
     
     @Override
@@ -104,11 +104,13 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
             log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
             return Optional.of(jobId);
         }
-        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), MigrationJob.class.getName());
+        repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getJobClassName());
         repositoryAPI.persist(jobConfigKey, convertJobConfigurationToText(jobConfig));
         return Optional.of(jobId);
     }
     
+    protected abstract String getJobClassName();
+    
     private String convertJobConfigurationToText(final PipelineJobConfiguration jobConfig) {
         JobConfigurationPOJO jobConfigPOJO = new JobConfigurationPOJO();
         jobConfigPOJO.setJobName(jobConfig.getJobId());
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 2e72e3e2451..c6030f6cc04 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -33,11 +33,13 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
@@ -115,6 +117,13 @@ public final class InventoryTaskSplitter {
     
     private Collection<InventoryDumperConfiguration> splitByPrimaryKey(final InventoryDumperConfiguration dumperConfig, final InventoryIncrementalJobItemContext jobItemContext,
                                                                        final DataSource dataSource) {
+        if (null == dumperConfig.getUniqueKey()) {
+            String schemaName = dumperConfig.getSchemaName(new LogicTableName(dumperConfig.getLogicTableName()));
+            String actualTableName = dumperConfig.getActualTableName();
+            PipelineColumnMetaData uniqueKeyColumn = PipelineTableMetaDataUtil.getUniqueKeyColumn(schemaName, actualTableName, metaDataLoader);
+            dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
+            dumperConfig.setUniqueKeyDataType(uniqueKeyColumn.getDataType());
+        }
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         InventoryIncrementalProcessContext jobProcessContext = jobItemContext.getJobProcessContext();
         PipelineReadConfiguration readConfig = jobProcessContext.getPipelineProcessConfig().getRead();
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index b54a5a493c8..d1922948cf6 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -52,7 +52,9 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
                     log.info("{} added to executing jobs failed since it already exists", jobId);
                 } else {
                     log.info("{} executing jobs", jobId);
-                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor());
+                    CompletableFuture.runAsync(() -> execute(jobConfigPOJO), PipelineContext.getEventListenerExecutor()).whenComplete((unused, throwable) -> {
+                        log.error("execute failed, jobId={}", jobId, throwable);
+                    });
                 }
                 break;
             case DELETED:
@@ -70,8 +72,8 @@ public final class MigrationChangedJobConfigurationProcessor implements Pipeline
         MigrationJob job = new MigrationJob();
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
-        oneOffJobBootstrap.execute();
         job.setOneOffJobBootstrap(oneOffJobBootstrap);
+        oneOffJobBootstrap.execute();
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 27ef7c8f539..b4419dafa05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -183,6 +183,11 @@ public final class MigrationJobAPIImpl extends InventoryIncrementalJobPublicAPII
         return marshalJobId(jobId);
     }
     
+    @Override
+    protected String getJobClassName() {
+        return MigrationJob.class.getName();
+    }
+    
     @Override
     protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
         return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);