You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/03/03 11:21:34 UTC
[shardingsphere] branch master updated: Improve migrate table param check; Add column in show migration DistSQLs (#24442)
This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 334a409201d Improve migrate table param check; Add column in show migration DistSQLs (#24442)
334a409201d is described below
commit 334a409201d91c1b95b8065a0ac15193a5d25ded
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Mar 3 19:21:26 2023 +0800
Improve migrate table param check; Add column in show migration DistSQLs (#24442)
* Check dataSourceName exists before migrating table
* Add parameter invalid unit tests for MigrationJobAPI.createJobAndStart
* Add tables column in SHOW MIGRATION STATUS; Refactor getJobProgress and getJobInfo
* Add check_failed_tables column in SHOW MIGRATION CHECK STATUS
---
.../api/pojo/ConsistencyCheckJobItemInfo.java | 2 ++
.../api/pojo/InventoryIncrementalJobItemInfo.java | 2 ++
.../api/pojo/TableBasedPipelineJobInfo.java | 1 +
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 ++---
.../AbstractInventoryIncrementalJobAPIImpl.java | 8 ++++--
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 7 ++++++
.../query/ShowMigrationCheckStatusExecutor.java | 6 ++---
.../query/ShowMigrationJobStatusExecutor.java | 8 +++---
.../ShowMigrationCheckStatusExecutorTest.java | 29 ++++++++++++++++++++--
.../query/ShowMigrationJobStatusExecutorTest.java | 21 ++++++++++++++--
.../api/impl/ConsistencyCheckJobAPI.java | 3 +++
.../migration/api/impl/MigrationJobAPI.java | 5 ++--
.../migration/api/impl/MigrationJobAPITest.java | 17 +++++++++++++
13 files changed, 96 insertions(+), 19 deletions(-)
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
index 76376460629..4642a599b63 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/ConsistencyCheckJobItemInfo.java
@@ -33,6 +33,8 @@ public final class ConsistencyCheckJobItemInfo {
private Boolean checkSuccess;
+ private String checkFailedTableNames;
+
private int finishedPercentage;
private long remainingSeconds;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
index 117cdcbaac3..9ff478a61e4 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/InventoryIncrementalJobItemInfo.java
@@ -30,6 +30,8 @@ public class InventoryIncrementalJobItemInfo {
private final int shardingItem;
+ private final String tableNames;
+
private final InventoryIncrementalJobItemProgress jobItemProgress;
private final long startTimeMillis;
diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
index ef321c784e4..729f0caabf9 100644
--- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
@@ -31,6 +31,7 @@ public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
private final String databaseName;
+ // TODO Rename
private final String table;
public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 6f644730758..c43f7cda3a2 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -41,7 +41,6 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
@@ -285,10 +284,9 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- protected PipelineJobInfo getJobInfo(final String jobId) {
+ protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
- jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+ PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 8188d0cc8e3..d2a67063403 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventory
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobOffsetInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.api.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -98,6 +99,8 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
return result;
}
+ protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);
+
@Override
public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
@@ -118,9 +121,10 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
+ TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue();
if (null == jobItemProgress) {
- result.add(new InventoryIncrementalJobItemInfo(shardingItem, null, startTimeMillis, 0, ""));
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, ""));
continue;
}
int inventoryFinishedPercentage = 0;
@@ -130,7 +134,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
}
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
- result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage));
}
return result;
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f774125c855..919e3fc09ba 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
@@ -81,8 +82,14 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
.filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getTypeCode().equals(getJobType().getTypeCode()));
}
+ // TODO Add getJobInfo
protected abstract PipelineJobInfo getJobInfo(String jobId);
+ protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) {
+ return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(),
+ jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+ }
+
@Override
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
index 54941510d76..bba29e519eb 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java
@@ -46,9 +46,9 @@ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecu
return result;
}
- private LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo info) {
+ LocalDataQueryResultRow convert(final ConsistencyCheckJobItemInfo info) {
String checkResult = null == info.getCheckSuccess() ? "" : info.getCheckSuccess().toString();
- return new LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult,
+ return new LocalDataQueryResultRow(Optional.ofNullable(info.getTableNames()).orElse(""), checkResult, Optional.ofNullable(info.getCheckFailedTableNames()).orElse(""),
String.valueOf(info.getFinishedPercentage()), info.getRemainingSeconds(),
Optional.ofNullable(info.getCheckBeginTime()).orElse(""), Optional.ofNullable(info.getCheckEndTime()).orElse(""),
info.getDurationSeconds(), Optional.ofNullable(info.getErrorMessage()).orElse(""));
@@ -56,7 +56,7 @@ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecu
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("tables", "result", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
+ return Arrays.asList("tables", "result", "check_failed_tables", "finished_percentage", "remaining_seconds", "check_begin_time", "check_end_time", "duration_seconds", "error_message");
}
@Override
diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index a9462401b5f..e6b5cd3640b 100644
--- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -45,24 +45,24 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto
return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList());
}
- private LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
+ LocalDataQueryResultRow generateResultRow(final InventoryIncrementalJobItemInfo jobItemInfo, final long currentTimeMillis) {
InventoryIncrementalJobItemProgress jobItemProgress = jobItemInfo.getJobItemProgress();
if (null == jobItemProgress) {
- return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", jobItemInfo.getErrorMessage());
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), "", "", "", "", "", "", "", jobItemInfo.getErrorMessage());
}
String incrementalIdleSeconds = "";
if (jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
long latestActiveTimeMillis = Math.max(jobItemInfo.getStartTimeMillis(), jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
incrementalIdleSeconds = String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis));
}
- return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+ return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), jobItemProgress.getDataSourceName(), jobItemInfo.getTableNames(), jobItemProgress.getStatus(),
jobItemProgress.isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), jobItemInfo.getInventoryFinishedPercentage(),
incrementalIdleSeconds, jobItemInfo.getErrorMessage());
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
+ return Arrays.asList("item", "data_source", "tables", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
}
@Override
diff --git a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
index 98be3eb3316..a6abd2511a9 100644
--- a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
+++ b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutorTest.java
@@ -17,24 +17,31 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.pojo.ConsistencyCheckJobItemInfo;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.junit.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public final class ShowMigrationCheckStatusExecutorTest {
+ private final ShowMigrationCheckStatusExecutor executor = new ShowMigrationCheckStatusExecutor();
+
@Test
public void assertGetColumnNames() {
- ShowMigrationCheckStatusExecutor executor = new ShowMigrationCheckStatusExecutor();
Collection<String> columns = executor.getColumnNames();
- assertThat(columns.size(), is(8));
+ assertThat(columns.size(), is(9));
Iterator<String> iterator = columns.iterator();
assertThat(iterator.next(), is("tables"));
assertThat(iterator.next(), is("result"));
+ assertThat(iterator.next(), is("check_failed_tables"));
assertThat(iterator.next(), is("finished_percentage"));
assertThat(iterator.next(), is("remaining_seconds"));
assertThat(iterator.next(), is("check_begin_time"));
@@ -42,4 +49,22 @@ public final class ShowMigrationCheckStatusExecutorTest {
assertThat(iterator.next(), is("duration_seconds"));
assertThat(iterator.next(), is("error_message"));
}
+
+ @Test
+ @SneakyThrows(ReflectiveOperationException.class)
+ @SuppressWarnings("unchecked")
+ public void assertConvert() {
+ ConsistencyCheckJobItemInfo jobItemInfo = new ConsistencyCheckJobItemInfo();
+ jobItemInfo.setTableNames("t_order");
+ jobItemInfo.setCheckSuccess(true);
+ jobItemInfo.setCheckFailedTableNames(null);
+ jobItemInfo.setFinishedPercentage(100);
+ jobItemInfo.setRemainingSeconds(0);
+ jobItemInfo.setCheckBeginTime("");
+ jobItemInfo.setCheckEndTime("");
+ jobItemInfo.setDurationSeconds(1);
+ LocalDataQueryResultRow row = executor.convert(jobItemInfo);
+ List<Object> actual = (List<Object>) Plugins.getMemberAccessor().get(LocalDataQueryResultRow.class.getDeclaredField("data"), row);
+ assertThat(actual.size(), is(executor.getColumnNames().size()));
+ }
}
diff --git a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
index ad4200b6078..9feeb7a35d9 100644
--- a/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
+++ b/kernel/data-pipeline/distsql/handler/src/test/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutorTest.java
@@ -17,24 +17,31 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
import org.junit.Test;
+import org.mockito.internal.configuration.plugins.Plugins;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public final class ShowMigrationJobStatusExecutorTest {
+ private final ShowMigrationJobStatusExecutor executor = new ShowMigrationJobStatusExecutor();
+
@Test
public void assertGetColumnNames() {
- ShowMigrationJobStatusExecutor executor = new ShowMigrationJobStatusExecutor();
Collection<String> columns = executor.getColumnNames();
- assertThat(columns.size(), is(8));
+ assertThat(columns.size(), is(9));
Iterator<String> iterator = columns.iterator();
assertThat(iterator.next(), is("item"));
assertThat(iterator.next(), is("data_source"));
+ assertThat(iterator.next(), is("tables"));
assertThat(iterator.next(), is("status"));
assertThat(iterator.next(), is("active"));
assertThat(iterator.next(), is("processed_records_count"));
@@ -42,4 +49,14 @@ public final class ShowMigrationJobStatusExecutorTest {
assertThat(iterator.next(), is("incremental_idle_seconds"));
assertThat(iterator.next(), is("error_message"));
}
+
+ @Test
+ @SneakyThrows(ReflectiveOperationException.class)
+ @SuppressWarnings("unchecked")
+ public void assertGenerateResultRowWithNullJobItemProgress() {
+ InventoryIncrementalJobItemInfo jobItemInfo = new InventoryIncrementalJobItemInfo(0, "t_order", null, System.currentTimeMillis(), 0, null);
+ LocalDataQueryResultRow row = executor.generateResultRow(jobItemInfo, System.currentTimeMillis());
+ List<Object> actual = (List<Object>) Plugins.getMemberAccessor().get(LocalDataQueryResultRow.class.getDeclaredField("data"), row);
+ assertThat(actual.size(), is(executor.getColumnNames().size()));
+ }
}
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b5221009179..03a3f49ce14 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -67,6 +67,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -308,6 +309,8 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(
PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getTypeName());
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
+ result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
+ .map(Entry::getKey).collect(Collectors.joining(",")));
return result;
}
diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 73a9eab3fbd..9de4b5420b3 100644
--- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -152,6 +152,8 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
if (configSources.containsKey(dataSourceName)) {
continue;
}
+ ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
+ () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
Map<String, Object> sourceDataSourceProps = swapper.swapToMap(metaDataDataSource.get(dataSourceName));
StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourceProps);
configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter()));
@@ -224,8 +226,7 @@ public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImp
@Override
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobId, !jobConfigPOJO.isDisabled(),
- jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter());
+ PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode -> sourceTables.add(DataNodeUtil.formatWithSchema(dataNode)))));
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c62bc32c71a..ffafc73dc51 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.api.pojo.InventoryIncrementalJobI
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.core.check.consistency.ConsistencyCheckJobItemProgressContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -71,12 +72,15 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -271,6 +275,19 @@ public final class MigrationJobAPITest {
assertTrue(actual.containsKey("ds_0"));
}
+ @Test
+ public void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
+ List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0", "t_order_1")
+ .map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
+ assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ }
+
+ @Test
+ public void assertCreateJobConfigFailedOnDataSourceNotExist() {
+ List<SourceTargetEntry> sourceTargetEntries = Collections.singletonList(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order"));
+ assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(new MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ }
+
@Test
public void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();