You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/09/19 12:56:14 UTC

[shardingsphere] branch master updated: Show migration status add error_msg column (#21065)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ead6082cdf8 Show migration status add error_msg column (#21065)
ead6082cdf8 is described below

commit ead6082cdf83b5a673d9fe3e965fa951a3ec556e
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Sep 19 20:56:07 2022 +0800

    Show migration status add error_msg column (#21065)
    
    * Show migration status add error_msg column
    
    * Fix codestyle
---
 .../ShowMigrationJobStatusQueryResultSet.java      |  9 ++++++--
 .../InventoryIncrementalJobItemProgress.java       |  2 ++
 .../pipeline/core/api/GovernanceRepositoryAPI.java | 17 ++++++++++++++
 .../data/pipeline/core/api/PipelineJobAPI.java     | 26 ++++++++++++++++++++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  | 26 ++++++++++++++++++++++
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 10 +++++++++
 .../core/metadata/node/PipelineMetaDataNode.java   | 11 +++++++++
 .../core/task/InventoryIncrementalTasksRunner.java |  6 ++++-
 .../pipeline/scenario/migration/MigrationJob.java  |  6 ++++-
 .../scenario/migration/MigrationJobAPIImpl.java    |  1 +
 .../data/pipeline/cases/base/BaseITCase.java       |  3 +++
 11 files changed, 113 insertions(+), 4 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index 78eb9d3f80d..865b92f4de8 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query;
 
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -28,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -43,7 +45,8 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement sqlStatement) {
         long currentTimeMillis = System.currentTimeMillis();
-        data = JOB_API.getJobProgress(((ShowMigrationStatusStatement) sqlStatement).getJobId()).entrySet().stream()
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = JOB_API.getJobProgress(((ShowMigrationStatusStatement) sqlStatement).getJobId());
+        data = jobProgress.entrySet().stream()
                 .map(entry -> {
                     Collection<Object> result = new LinkedList<>();
                     result.add(entry.getKey());
@@ -55,6 +58,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                         result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
                         long latestActiveTimeMillis = entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
                         result.add(latestActiveTimeMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) : 0);
+                        result.add(entry.getValue().getErrorMessage());
                     } else {
                         result.add("");
                         result.add("");
@@ -62,6 +66,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
                         result.add("");
                         result.add("");
                         result.add("");
+                        result.add("");
                     }
                     return result;
                 }).collect(Collectors.toList()).iterator();
@@ -69,7 +74,7 @@ public final class ShowMigrationJobStatusQueryResultSet implements DatabaseDistS
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds");
+        return Arrays.asList("item", "data_source", "status", "active", "processed_records_count", "inventory_finished_percentage", "incremental_idle_seconds", "error_message");
     }
     
     @Override
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index 7d08abc8788..659f1d90330 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -41,4 +41,6 @@ public final class InventoryIncrementalJobItemProgress implements PipelineJobIte
     private JobItemIncrementalTasksProgress incremental;
     
     private long processedRecordsCount;
+    
+    private transient String errorMessage;
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index d745f71b1e3..e48f01c66e2 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -140,4 +140,21 @@ public interface GovernanceRepositoryAPI {
      * @param processConfigYamlText process configuration YAML text
      */
     void persistMetaDataProcessConfiguration(JobType jobType, String processConfigYamlText);
+    
+    /**
+     * Get job item error msg.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return error msg
+     */
+    String getJobItemErrorMessage(String jobId, int shardingItem);
+    
+    /**
+     * Clean job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    void cleanJobItemErrorMessage(String jobId, int shardingItem);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 9ac14928a7b..fa31460c16f 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -83,4 +83,30 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, PipelineJobItemAPI
      * @return job configuration
      */
     PipelineJobConfiguration getJobConfiguration(String jobId);
+    
+    /**
+     * Get job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return map, key is sharding item, value is error message
+     */
+    String getJobItemErrorMessage(String jobId, int shardingItem);
+    
+    /**
+     * Persist job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param error error
+     */
+    void persistJobItemErrorMessage(String jobId, int shardingItem, Object error);
+    
+    /**
+     * Clean job item error message.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     */
+    void cleanJobItemErrorMessage(String jobId, int shardingItem);
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ef3c90dd26c..a2e51f2fa05 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -19,6 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
@@ -231,4 +233,28 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
     public String getType() {
         return getJobType().getTypeName();
     }
+    
+    @Override
+    public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
+        return ObjectUtils.defaultIfNull(PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemErrorMessage(jobId, shardingItem), "");
+    }
+    
+    @Override
+    public void persistJobItemErrorMessage(final String jobId, final int shardingItem, final Object error) {
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
+        String value = "";
+        if (null != error) {
+            if (error instanceof Throwable) {
+                value = ExceptionUtils.getStackTrace((Throwable) error);
+            } else {
+                value = error.toString();
+            }
+        }
+        PipelineAPIFactory.getGovernanceRepositoryAPI().persist(key, value);
+    }
+    
+    @Override
+    public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
+        PipelineAPIFactory.getGovernanceRepositoryAPI().cleanJobItemErrorMessage(jobId, shardingItem);
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index a6dbbd1cc5b..00a51184f12 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -113,4 +113,14 @@ public final class GovernanceRepositoryAPIImpl implements GovernanceRepositoryAP
     public void persistMetaDataProcessConfiguration(final JobType jobType, final String processConfigYamlText) {
         repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), processConfigYamlText);
     }
+    
+    @Override
+    public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
+        return repository.get(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
+    }
+    
+    @Override
+    public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
+        repository.delete(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 6a8eed153e0..6eeab56f1d7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -148,4 +148,15 @@ public final class PipelineMetaDataNode {
     public static String getJobBarrierDisablePath(final String jobId) {
         return String.join("/", getJobRootPath(jobId), "barrier", "disable");
     }
+    
+    /**
+     * Get job item error msg.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job item error msg
+     */
+    public static String getJobItemErrorMessagePath(final String jobId, final int shardingItem) {
+        return String.join("/", getJobRootPath(jobId), "error", Integer.toString(shardingItem));
+    }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index b9ea57c776c..ce4b7ac6ef9 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -111,7 +111,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
     
     private ExecuteCallback createInventoryTaskCallback() {
         return new ExecuteCallback() {
-            
+    
             @Override
             public void onSuccess() {
                 if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
@@ -124,6 +124,8 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             public void onFailure(final Throwable throwable) {
                 log.error("Inventory task execute failed.", throwable);
                 updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
+                PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
+                        .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
                 stop();
             }
         };
@@ -160,6 +162,8 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
             public void onFailure(final Throwable throwable) {
                 log.error("Incremental task execute failed.", throwable);
                 updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
+                PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()))
+                        .persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), throwable);
                 stop();
             }
         };
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6c7c19bc713..817512e1d17 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -26,9 +26,11 @@ import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
@@ -73,6 +75,7 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
             return;
         }
         log.info("start tasks runner, jobId={}, shardingItem={}", getJobId(), shardingItem);
+        PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).cleanJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem());
         // TODO inventory and incremental tasks are always empty on construction
         InventoryIncrementalTasksRunner tasksRunner = new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks(),
                 jobItemContext.getJobProcessContext().getInventoryDumperExecuteEngine(), jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine());
@@ -92,9 +95,10 @@ public final class MigrationJob extends AbstractPipelineJob implements SimpleJob
         } catch (final SQLException | RuntimeException ex) {
             // CHECKSTYLE:ON
             log.error("job prepare failed, {}-{}", getJobId(), jobItemContext.getShardingItem(), ex);
-            PipelineJobCenter.stop(getJobId());
+            PipelineJobCenter.stop(jobItemContext.getJobId());
             jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
             jobAPI.persistJobItemProgress(jobItemContext);
+            jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
             if (ex instanceof RuntimeException) {
                 throw (RuntimeException) ex;
             }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 087e6eea741..bd4f0e67664 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -271,6 +271,7 @@ public final class MigrationJobAPIImpl extends AbstractPipelineJobAPIImpl implem
             InventoryIncrementalJobItemProgress jobItemProgress = getJobItemProgress(jobId, each);
             if (null != jobItemProgress) {
                 jobItemProgress.setActive(!jobConfigPOJO.isDisabled());
+                jobItemProgress.setErrorMessage(getJobItemErrorMessage(jobId, each));
             }
             map.put(each, jobItemProgress);
         }, LinkedHashMap::putAll);
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index 2105107d595..1b19f43cd99 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -276,6 +276,9 @@ public abstract class BaseITCase {
             actualStatus = listJobStatus.stream().map(each -> each.get("status").toString()).collect(Collectors.toSet());
             assertFalse(CollectionUtils.containsAny(actualStatus, Arrays.asList(JobStatus.PREPARING_FAILURE.name(), JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name(),
                     JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name())));
+            for (Map<String, Object> each : listJobStatus) {
+                assertTrue(StringUtils.isBlank(each.get("error_message").toString()));
+            }
             if (actualStatus.size() == 1 && actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
                 return listJobStatus;
             } else if (actualStatus.size() >= 1 && actualStatus.containsAll(new HashSet<>(Arrays.asList("", JobStatus.EXECUTE_INCREMENTAL_TASK.name())))) {