You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/03/03 11:21:34 UTC

[shardingsphere] branch master updated: Improve migrate table param check; Add column in show migration DistSQLs (#24442)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 334a409201d Improve migrate table param check; Add column in show migration DistSQLs (#24442)
334a409201d is described below

commit 334a409201d91c1b95b8065a0ac15193a5d25ded
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Mar 3 19:21:26 2023 +0800

    Improve migrate table param check; Add column in show migration DistSQLs (#24442)
    
    * Check dataSourceName exists before migrating table
    
    * Add parameter invalid unit tests for MigrationJobAPI.createJobAndStart
    
    * Add tables column in SHOW MIGRATION STATUS; Refactor getJobProgress and getJobInfo
    
    * Add check_failed_tables column in SHOW MIGRATION CHECK STATUS
---
 .../api/pojo/ConsistencyCheckJobItemInfo.java      |  2 ++
 .../api/pojo/InventoryIncrementalJobItemInfo.java  |  2 ++
 .../api/pojo/TableBasedPipelineJobInfo.java        |  1 +
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  6 ++---
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  8 ++++--
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  7 ++++++
 .../query/ShowMigrationCheckStatusExecutor.java    |  6 ++---
 .../query/ShowMigrationJobStatusExecutor.java      |  8 +++---
 .../ShowMigrationCheckStatusExecutorTest.java      | 29 ++++++++++++++++++++--
 .../query/ShowMigrationJobStatusExecutorTest.java  | 21 ++++++++++++++--
 .../api/impl/ConsistencyCheckJobAPI.java           |  3 +++
 .../migration/api/impl/MigrationJobAPI.java        |  5 ++--
 .../migration/api/impl/MigrationJobAPITest.java    | 17 +++++++++++++
 13 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
index 76376460629..4642a599b63 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
@@ -33,6 +33,8 @@ public final class ConsistencyCheckJobItemInfo {
     
     private Boolean checkSuccess;
     
+    private String checkFailedTableNames;
+    
     private int finishedPercentage;
     
     private long remainingSeconds;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 117cdcbaac3..9ff478a61e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -30,6 +30,8 @@ public class InventoryIncrementalJobItemInfo {
     
     private final int shardingItem;
     
+    private final String tableNames;
+    
     private final InventoryIncrementalJobItemProgress jobItemProgress;
     
     private final long startTimeMillis;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
index ef321c784e4..729f0caabf9 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
@@ -31,6 +31,7 @@ public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
     
     private final String databaseName;
     
+    // TODO Rename
     private final String table;
     
     public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6f644730758..c43f7cda3a2 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -41,7 +41,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
 import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -285,10 +284,9 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    protected PipelineJobInfo getJobInfo(final String jobId) {
+    protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
-                jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+        PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
         CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
         return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 8188d0cc8e3..d2a67063403 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventory
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -98,6 +99,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         return result;
     }
     
+    protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);
+    
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
@@ -118,9 +121,10 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
+            TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
             if (null == jobItemProgress) {
-                result.add(new InventoryIncrementalJobItemInfo(shardingItem, null, startTimeMillis, 0, ""));
+                result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, ""));
                 continue;
             }
             int inventoryFinishedPercentage = 0;
@@ -130,7 +134,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
                 inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
             }
             String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
-            result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
+            result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
         }
         return result;
     }
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f774125c855..919e3fc09ba 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
 import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
@@ -81,8 +82,14 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
                 .filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getTypeCode().equals(getJobType().getTypeCode()));
     }
     
+    // TODO Add getJobInfo
     protected abstract PipelineJobInfo getJobInfo(String jobId);
     
+    protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
+        return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
+                jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+    }
+    
     @Override
     public Optional<String> start(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 54941510d76..bba29e519eb 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -46,9 +46,9 @@ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecu
         return result;
     }
     
-    private LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo info) {
+    LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo info) {
         String checkResult = null == info.getCheckSuccess() ? "" : info.getCheckSuccess().toString();
-        return new LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult,
+        return new LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult, Optional.ofNullable(info.getCheckFailedTableNames()).orElse(""),
                 String.valueOf(info.getFinishedPercentage()), info.getRemainingSeconds(),
                 Optional.ofNullable(info.getCheckBeginTime()).orElse(""), Optional.ofNullable(info.getCheckEndTime()).orElse(""),
                 info.getDurationSeconds(), Optional.ofNullable(info.getErrorMessage()).orElse(""));
@@ -56,7 +56,7 @@ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecu
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("tables", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
+        return Arrays.asList("tables", "result", "check_failed_tables", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
     }
     
     @Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index a9462401b5f..e6b5cd3640b 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -45,24 +45,24 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
         return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
     }
     
-    private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+    LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
         InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
         if (null == jobItemProgress) {
-            return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+            return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", "", jobItemInfo.getErrorMessage());
         }
         String incrementalIdleSeconds = "";
         if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
             long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
             incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
         }
-        return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+        return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemInfo.getTableNames(), jobItemProgress.getStatus(),
                 jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
                 incrementalIdleSeconds, jobItemInfo.getErrorMessage());
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
+        return Arrays.asList("item", "data_source", "tables", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
     }
     
     @Override
diff --git a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
index 98be3eb3316..a6abd2511a9 100644
--- a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
+++ b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
@@ -17,24 +17,31 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.junit.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class ShowMigrationCheckStatusExecutorTest {
     
+    private final ShowMigrationCheckStatusExecutor executor = new ShowMigrationCheckStatusExecutor();
+    
     @Test
     public void assertGetColumnNames() {
-        ShowMigrationCheckStatusExecutor executor = new ShowMigrationCheckStatusExecutor();
         Collection<String> columns = executor.getColumnNames();
-        assertThat(columns.size(), is(8));
+        assertThat(columns.size(), is(9));
         Iterator<String> iterator = columns.iterator();
         assertThat(iterator.next(), is("tables"));
         assertThat(iterator.next(), is("result"));
+        assertThat(iterator.next(), is("check_failed_tables"));
         assertThat(iterator.next(), is("finished_percentage"));
         assertThat(iterator.next(), is("remaining_seconds"));
         assertThat(iterator.next(), is("check_begin_time"));
@@ -42,4 +49,22 @@ public final class ShowMigrationCheckStatusExecutorTest {
         assertThat(iterator.next(), is("duration_seconds"));
         assertThat(iterator.next(), is("error_message"));
     }
+    
+    @Test
+    @SneakyThrows(ReflectiveOperationException.class)
+    @SuppressWarnings("unchecked")
+    public void assertConvert() {
+        ConsistencyCheckJobItemInfo jobItemInfo = new ConsistencyCheckJobItemInfo();
+        jobItemInfo.setTableNames("t_order");
+        jobItemInfo.setCheckSuccess(true);
+        jobItemInfo.setCheckFailedTableNames(null);
+        jobItemInfo.setFinishedPercentage(100);
+        jobItemInfo.setRemainingSeconds(0);
+        jobItemInfo.setCheckBeginTime("");
+        jobItemInfo.setCheckEndTime("");
+        jobItemInfo.setDurationSeconds(1);
+        LocalDataQueryResultRow row = executor.convert(jobItemInfo);
+        List<Object> actual = (List<Object>) Plugins.getMemberAccessor().get(LocalDataQueryResultRow.class.getDeclaredField("data"), row);
+        assertThat(actual.size(), is(executor.getColumnNames().size()));
+    }
 }
diff --git a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
index ad4200b6078..9feeb7a35d9 100644
--- a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
+++ b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
@@ -17,24 +17,31 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.junit.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 public final class ShowMigrationJobStatusExecutorTest {
     
+    private final ShowMigrationJobStatusExecutor executor = new ShowMigrationJobStatusExecutor();
+    
     @Test
     public void assertGetColumnNames() {
-        ShowMigrationJobStatusExecutor executor = new ShowMigrationJobStatusExecutor();
         Collection<String> columns = executor.getColumnNames();
-        assertThat(columns.size(), is(8));
+        assertThat(columns.size(), is(9));
         Iterator<String> iterator = columns.iterator();
         assertThat(iterator.next(), is("item"));
         assertThat(iterator.next(), is("data_source"));
+        assertThat(iterator.next(), is("tables"));
         assertThat(iterator.next(), is("status"));
         assertThat(iterator.next(), is("active"));
         assertThat(iterator.next(), is("processed_records_count"));
@@ -42,4 +49,14 @@ public final class ShowMigrationJobStatusExecutorTest {
         assertThat(iterator.next(), is("incremental_idle_seconds"));
         assertThat(iterator.next(), is("error_message"));
     }
+    
+    @Test
+    @SneakyThrows(ReflectiveOperationException.class)
+    @SuppressWarnings("unchecked")
+    public void assertGenerateResultRowWithNullJobItemProgress() {
+        InventoryIncrementalJobItemInfo jobItemInfo = new InventoryIncrementalJobItemInfo(0, "t_order", null, System.currentTimeMillis(), 0, null);
+        LocalDataQueryResultRow row = executor.generateResultRow(jobItemInfo, System.currentTimeMillis());
+        List<Object> actual = (List<Object>) Plugins.getMemberAccessor().get(LocalDataQueryResultRow.class.getDeclaredField("data"), row);
+        assertThat(actual.size(), is(executor.getColumnNames().size()));
+    }
 }
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b5221009179..03a3f49ce14 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -67,6 +67,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -308,6 +309,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
         InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(
                 PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getTypeName());
         result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
+        result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
+                .map(Entry::getKey).collect(Collectors.joining(",")));
         return result;
     }
     
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 73a9eab3fbd..9de4b5420b3 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -152,6 +152,8 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
             if (configSources.containsKey(dataSourceName)) {
                 continue;
             }
+            ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
+                    () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
             Map<String, Object> sourceDataSourceProps = swapper.swapToMap(metaDataDataSource.get(dataSourceName));
             StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourceProps);
             configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
@@ -224,8 +226,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
     @Override
     protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
-                jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+        PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
         List<String> sourceTables = new LinkedList<>();
         getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
                 .forEach(dataNode -> sourceTables.add(DataNodeUtil.formatWithSchema(dataNode)))));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c62bc32c71a..ffafc73dc51 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobI
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -71,12 +72,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -271,6 +275,19 @@ public final class MigrationJobAPITest {
         assertTrue(actual.containsKey("ds_0"));
     }
     
+    @Test
+    public void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
+        List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1")
+                .map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
+        assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+    }
+    
+    @Test
+    public void assertCreateJobConfigFailedOnDataSourceNotExist() {
+        List<SourceTargetEntry> sourceTargetEntries = Collections.singletonList(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order"));
+        assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+    }
+    
     @Test
     public void assertCreateJobConfig() throws SQLException {
         initIntPrimaryEnvironment();