You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/19 12:56:14 UTC
[shardingsphere] branch master updated: Show migration status add error_msg column (#21065)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ead6082cdf8 Show migration status add error_msg column (#21065)
ead6082cdf8 is described below
commit ead6082cdf83b5a673d9fe3e965fa951a3ec556e
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Sep 19 20:56:07 2022 +0800
Show migration status add error_msg column (#21065)
* Show migration status add error_msg column
* Fix codestyle
---
.../ShowMigrationJobStatusQueryResultSet.java | 9 ++++++--
.../InventoryIncrementalJobItemProgress.java | 2 ++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 17 ++++++++++++++
.../data/pipeline/core/api/PipelineJobAPI.java | 26 ++++++++++++++++++++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 26 ++++++++++++++++++++++
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 10 +++++++++
.../core/metadata/node/PipelineMetaDataNode.java | 11 +++++++++
.../core/task/InventoryIncrementalTasksRunner.java | 6 ++++-
.../pipeline/scenario/migration/MigrationJob.java | 6 ++++-
.../scenario/migration/MigrationJobAPIImpl.java | 1 +
.../data/pipeline/cases/base/BaseITCase.java | 3 +++
11 files changed, 113 insertions(+), 4 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 78eb9d3f80d..865b92f4de8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -28,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -43,7 +45,8 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
long currentTimeMillis = System.currentTimeMillis();
- data = JOB_API.getJobProgress(((ShowMigrationStatusStatement) sqlStatement).getJobId()).entrySet().stream()
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = JOB_API.getJobProgress(((ShowMigrationStatusStatement) sqlStatement).getJobId());
+ data = jobProgress.entrySet().stream()
.map(entry -> {
Collection<Object> result = new LinkedList<>();
result.add(entry.getKey());
@@ -55,6 +58,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
long latestActiveTimeMillis = entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
result.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
+ result.add(entry.getValue().getErrorMessage());
} else {
result.add("");
result.add("");
@@ -62,6 +66,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
result.add("");
result.add("");
result.add("");
+ result.add("");
}
return result;
}).collect(Collectors.toList()).iterator();
@@ -69,7 +74,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds");
+ return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
}
@Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index 7d08abc8788..659f1d90330 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -41,4 +41,6 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
private JobItemIncrementalTasksProgress incremental;
private long processedRecordsCount;
+
+ private transient String errorMessage;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index d745f71b1e3..e48f01c66e2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -140,4 +140,21 @@ public interface GovernanceRepositoryAPI {
* @param processConfigYamlText process configuration YAML text
*/
void persistMetaDataProcessConfiguration(JobType jobType, String processConfigYamlText);
+
+ /**
+ * Get job item error msg.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return error msg
+ */
+ String getJobItemErrorMessage(String jobId, int shardingItem);
+
+ /**
+ * Clean job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ void cleanJobItemErrorMessage(String jobId, int shardingItem);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 9ac14928a7b..fa31460c16f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -83,4 +83,30 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
* @return job configuration
*/
PipelineJobConfiguration getJobConfiguration(String jobId);
+
+ /**
+ * Get job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return map, key is sharding item, value is error message
+ */
+ String getJobItemErrorMessage(String jobId, int shardingItem);
+
+ /**
+ * Persist job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param error error
+ */
+ void persistJobItemErrorMessage(String jobId, int shardingItem, Object error);
+
+ /**
+ * Clean job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ void cleanJobItemErrorMessage(String jobId, int shardingItem);
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ef3c90dd26c..a2e51f2fa05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
@@ -231,4 +233,28 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
public String getType() {
return getJobType().getTypeName();
}
+
+ @Override
+ public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
+ return ObjectUtils.defaultIfNull(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMessage(jobId, shardingItem), "");
+ }
+
+ @Override
+ public void persistJobItemErrorMessage(final String jobId, final int shardingItem, final Object error) {
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
+ String value = "";
+ if (null != error) {
+ if (error instanceof Throwable) {
+ value = ExceptionUtils.getStackTrace((Throwable) error);
+ } else {
+ value = error.toString();
+ }
+ }
+ PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
+ }
+
+ @Override
+ public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
+ PipelineAPIFactory.getGovernanceRepositoryAPI().cleanJobItemErrorMessage(jobId, shardingItem);
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index a6dbbd1cc5b..00a51184f12 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -113,4 +113,14 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
public void persistMetaDataProcessConfiguration(final JobType jobType, final String processConfigYamlText) {
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), processConfigYamlText);
}
+
+ @Override
+ public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
+ return repository.get(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
+ }
+
+ @Override
+ public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
+ repository.delete(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 6a8eed153e0..6eeab56f1d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -148,4 +148,15 @@ public final class PipelineMetaDataNode {
public static String getJobBarrierDisablePath(final String jobId) {
return String.join("/", getJobRootPath(jobId), "barrier", "disable");
}
+
+ /**
+ * Get job item error msg.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job item error msg
+ */
+ public static String getJobItemErrorMessagePath(final String jobId, final int shardingItem) {
+ return String.join("/", getJobRootPath(jobId), "error", Integer.toString(shardingItem));
+ }
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index b9ea57c776c..ce4b7ac6ef9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -111,7 +111,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
private ExecuteCallback createInventoryTaskCallback() {
return new ExecuteCallback() {
-
+
@Override
public void onSuccess() {
if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
@@ -124,6 +124,8 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
public void onFailure(final Throwable throwable) {
log.error("Inventory task execute failed.", throwable);
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+ PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
+ .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
stop();
}
};
@@ -160,6 +162,8 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
public void onFailure(final Throwable throwable) {
log.error("Incremental task execute failed.", throwable);
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+ PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
+ .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
stop();
}
};
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6c7c19bc713..817512e1d17 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
@@ -73,6 +75,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
return;
}
log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
+ PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
// TODO inventory and incremental tasks are always empty on construction
InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks(),
jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine());
@@ -92,9 +95,10 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
log.error("job prepare failed, {}-{}", getJobId(), jobItemContext.getShardingItem(), ex);
- PipelineJobCenter.stop(getJobId());
+ PipelineJobCenter.stop(jobItemContext.getJobId());
jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
jobAPI.persistJobItemProgress(jobItemContext);
+ jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 087e6eea741..bd4f0e67664 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -271,6 +271,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
if (null != jobItemProgress) {
jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
+ jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
}
map.put(each, jobItemProgress);
}, LinkedHashMap::putAll);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 2105107d595..1b19f43cd99 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -276,6 +276,9 @@ public abstract class BaseITCase {
actualStatus = listJobStatus.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+ for (Map<String, Object> each : listJobStatus) {
+ assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
+ }
if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
return listJobStatus;
} else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {