You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/11/01 01:50:21 UTC

[shardingsphere] branch master updated: Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)

This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb74c3e8b6 Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)
6bb74c3e8b6 is described below

commit 6bb74c3e8b667ee92805706c4f82f01561e2c028
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 1 09:50:14 2022 +0800

    Extract more methods to AbstractPipelineJob and AbstractSimplePipelineJob for common usage (#21875)
    
    * Add missed COUNT(1) in buildSplitByPrimaryKeyRangeSQL of Oracle
    
    * Extract stop() to AbstractPipelineJob
    
    * Extract prepare() to AbstractPipelineJob
    
    * Extract execute() to AbstractSimplePipelineJob
    
    * Extract shardingItem duplicated check to AbstractPipelineJob; Remove runInBackground
    
    * Clean Getter Setter
---
 .../pipeline/core/job/AbstractPipelineJob.java     | 80 ++++++++++++++++++----
 .../core/job/AbstractSimplePipelineJob.java        | 56 +++++++++++++++
 .../core/sqlbuilder/OraclePipelineSQLBuilder.java  |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java      | 53 ++++----------
 .../pipeline/scenario/migration/MigrationJob.java  | 77 +++++----------------
 5 files changed, 154 insertions(+), 114 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 10d59e68431..03ca0efd927 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -20,8 +20,13 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
@@ -36,27 +41,53 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * Abstract pipeline job.
  */
-@Getter
+@Slf4j
 public abstract class AbstractPipelineJob implements PipelineJob {
     
-    @Setter
+    @Getter
     private volatile String jobId;
     
-    @Setter
+    @Getter(value = AccessLevel.PROTECTED)
+    private volatile PipelineJobAPI jobAPI;
+    
+    @Getter
     private volatile boolean stopping;
     
     @Setter
     private volatile JobBootstrap jobBootstrap;
     
-    @Getter(value = AccessLevel.PRIVATE)
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new ConcurrentHashMap<>();
     
     private final PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance();
     
-    protected void runInBackground(final Runnable runnable) {
-        new Thread(runnable).start();
+    protected void setJobId(final String jobId) {
+        this.jobId = jobId;
+        jobAPI = PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId));
+    }
+    
+    protected void prepare(final PipelineJobItemContext jobItemContext) {
+        try {
+            long startTimeMillis = System.currentTimeMillis();
+            doPrepare(jobItemContext);
+            log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
+            // CHECKSTYLE:OFF
+        } catch (final Exception ex) {
+            // CHECKSTYLE:ON
+            String jobId = jobItemContext.getJobId();
+            log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
+            jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
+            jobAPI.persistJobItemProgress(jobItemContext);
+            jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
+            jobAPI.stop(jobId);
+            if (ex instanceof RuntimeException) {
+                throw (RuntimeException) ex;
+            }
+            throw new RuntimeException(ex);
+        }
     }
     
+    protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws Exception;
+    
     @Override
     public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
         return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
@@ -67,22 +98,41 @@ public abstract class AbstractPipelineJob implements PipelineJob {
         return new ArrayList<>(tasksRunnerMap.keySet());
     }
     
-    protected void addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
-        tasksRunnerMap.put(shardingItem, tasksRunner);
+    protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
+        if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
+            log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
+            return false;
+        }
         PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), shardingItem);
         distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()), shardingItem);
+        return true;
     }
     
-    protected boolean containsTasksRunner(final int shardingItem) {
-        return tasksRunnerMap.containsKey(shardingItem);
+    @Override
+    public void stop() {
+        try {
+            innerStop();
+        } finally {
+            innerClean();
+            doClean();
+        }
     }
     
-    protected void clearTasksRunner() {
-        tasksRunnerMap.clear();
-        PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+    private void innerStop() {
+        stopping = true;
+        if (null != jobBootstrap) {
+            jobBootstrap.shutdown();
+        }
+        log.info("stop tasks runner, jobId={}", getJobId());
+        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+            each.stop();
+        }
     }
     
-    protected Collection<PipelineTasksRunner> getTasksRunners() {
-        return tasksRunnerMap.values();
+    private void innerClean() {
+        tasksRunnerMap.clear();
+        PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
     }
+    
+    protected abstract void doClean();
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
new file mode 100644
index 00000000000..9e26b190c43
--- /dev/null
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+/**
+ * Abstract simple pipeline job.
+ */
+@Slf4j
+public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob implements SimpleJob {
+    
+    protected abstract PipelineJobItemContext buildPipelineJobItemContext(ShardingContext shardingContext);
+    
+    protected abstract PipelineTasksRunner buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+    
+    @Override
+    public void execute(final ShardingContext shardingContext) {
+        String jobId = shardingContext.getJobName();
+        int shardingItem = shardingContext.getShardingItem();
+        log.info("Execute job {}-{}", jobId, shardingItem);
+        if (isStopping()) {
+            log.info("stopping true, ignore");
+            return;
+        }
+        setJobId(jobId);
+        PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
+        PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
+        if (!addTasksRunner(shardingItem, tasksRunner)) {
+            return;
+        }
+        getJobAPI().cleanJobItemErrorMessage(jobId, jobItemContext.getShardingItem());
+        prepare(jobItemContext);
+        log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
+        tasksRunner.start();
+    }
+}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
index 14939768a4e..67a4e4716eb 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/OraclePipelineSQLBuilder.java
@@ -69,7 +69,7 @@ public final class OraclePipelineSQLBuilder extends AbstractPipelineSQLBuilder {
     public String buildSplitByPrimaryKeyRangeSQL(final String schemaName, final String tableName, final String primaryKey) {
         String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = quote(primaryKey);
-        return String.format("SELECT MAX(%s) FROM (SELECT * FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s) WHERE ROWNUM<=?) t",
+        return String.format("SELECT MAX(%s), COUNT(1) FROM (SELECT * FROM (SELECT %s FROM %s WHERE %s>=? ORDER BY %s) WHERE ROWNUM<=?) t",
                 quotedUniqueKey, quotedUniqueKey, qualifiedTableName, quotedUniqueKey, quotedUniqueKey);
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 19b866f8641..ac8f1f43580 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -19,58 +19,35 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.ConsistencyCheckJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
 /**
  * Consistency check job.
  */
 @Slf4j
-public final class ConsistencyCheckJob extends AbstractPipelineJob implements SimpleJob, PipelineJob {
-    
-    private final ConsistencyCheckJobAPI jobAPI = ConsistencyCheckJobAPIFactory.getInstance();
+public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
     
     @Override
-    public void execute(final ShardingContext shardingContext) {
-        String checkJobId = shardingContext.getJobName();
-        int shardingItem = shardingContext.getShardingItem();
-        log.info("Execute job {}-{}", checkJobId, shardingItem);
-        if (isStopping()) {
-            log.info("stopping true, ignore");
-            return;
-        }
-        setJobId(checkJobId);
+    protected ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
         ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        ConsistencyCheckJobItemContext jobItemContext = new ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
-        if (containsTasksRunner(shardingItem)) {
-            log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
-            return;
-        }
-        log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
-        jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-        ConsistencyCheckTasksRunner tasksRunner = new ConsistencyCheckTasksRunner(jobItemContext);
-        tasksRunner.start();
-        addTasksRunner(shardingItem, tasksRunner);
+        return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING);
+    }
+    
+    @Override
+    protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
+        return new ConsistencyCheckTasksRunner((ConsistencyCheckJobItemContext) pipelineJobItemContext);
+    }
+    
+    @Override
+    protected void doPrepare(final PipelineJobItemContext jobItemContext) {
     }
     
     @Override
-    public void stop() {
-        setStopping(true);
-        if (null != getJobBootstrap()) {
-            getJobBootstrap().shutdown();
-        }
-        if (null == getJobId()) {
-            log.info("stop consistency check job, jobId is null, ignore");
-            return;
-        }
-        for (PipelineTasksRunner each : getTasksRunners()) {
-            each.stop();
-        }
-        clearTasksRunner();
+    protected void doClean() {
     }
 }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 975e72d1f6b..cdf071c6226 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -20,16 +20,16 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
 import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
 import java.sql.SQLException;
 
@@ -38,7 +38,7 @@ import java.sql.SQLException;
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class MigrationJob extends AbstractPipelineJob implements SimpleJob {
+public final class MigrationJob extends AbstractSimplePipelineJob {
     
     private final MigrationJobAPI jobAPI = MigrationJobAPIFactory.getInstance();
     
@@ -48,71 +48,28 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
     private final MigrationJobPreparer jobPreparer = new MigrationJobPreparer();
     
     @Override
-    public void execute(final ShardingContext shardingContext) {
+    protected InventoryIncrementalJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) {
         int shardingItem = shardingContext.getShardingItem();
-        log.info("Execute job {}-{}", shardingContext.getJobName(), shardingItem);
-        if (isStopping()) {
-            log.info("stopping true, ignore");
-            return;
-        }
-        setJobId(shardingContext.getJobName());
         MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         InventoryIncrementalJobItemProgress initProgress = jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
-        MigrationJobItemContext jobItemContext = new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
-        if (containsTasksRunner(shardingItem)) {
-            log.warn("tasksRunnerMap contains shardingItem {}, ignore", shardingItem);
-            return;
-        }
-        log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
-        jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
-        InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
-        runInBackground(() -> {
-            prepare(jobItemContext);
-            tasksRunner.start();
-        });
-        addTasksRunner(shardingItem, tasksRunner);
+        return new MigrationJobItemContext(jobConfig, shardingItem, initProgress, jobProcessContext, taskConfig, dataSourceManager);
     }
     
-    private void prepare(final MigrationJobItemContext jobItemContext) {
-        try {
-            long startTimeMillis = System.currentTimeMillis();
-            jobPreparer.prepare(jobItemContext);
-            log.info("prepare cost {} ms", System.currentTimeMillis() - startTimeMillis);
-            // CHECKSTYLE:OFF
-        } catch (final SQLException | RuntimeException ex) {
-            // CHECKSTYLE:ON
-            log.error("job prepare failed, {}-{}", getJobId(), jobItemContext.getShardingItem(), ex);
-            jobAPI.stop(jobItemContext.getJobId());
-            jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
-            jobAPI.persistJobItemProgress(jobItemContext);
-            jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
-            if (ex instanceof RuntimeException) {
-                throw (RuntimeException) ex;
-            }
-            throw new RuntimeException(ex);
-        }
+    @Override
+    protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
+        InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
+        return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
     }
     
-    /**
-     * Stop job.
-     */
-    public void stop() {
-        setStopping(true);
+    @Override
+    protected void doPrepare(final PipelineJobItemContext jobItemContext) throws SQLException {
+        jobPreparer.prepare((MigrationJobItemContext) jobItemContext);
+    }
+    
+    @Override
+    public void doClean() {
         dataSourceManager.close();
-        if (null != getJobBootstrap()) {
-            getJobBootstrap().shutdown();
-        }
-        String jobId = getJobId();
-        if (null == jobId) {
-            log.info("stop, jobId is null, ignore");
-            return;
-        }
-        log.info("stop tasks runner, jobId={}", jobId);
-        for (PipelineTasksRunner each : getTasksRunners()) {
-            each.stop();
-        }
-        clearTasksRunner();
     }
 }