You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by pa...@apache.org on 2022/09/15 01:44:54 UTC

[shardingsphere] branch master updated: Add PrepareJobWithoutEnoughPrivilegeException (#20986)

This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d00f1bd1ac6 Add PrepareJobWithoutEnoughPrivilegeException (#20986)
d00f1bd1ac6 is described below

commit d00f1bd1ac6fbf551d606a7eece0681c5e992bd5
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Thu Sep 15 09:44:35 2022 +0800

    Add PrepareJobWithoutEnoughPrivilegeException (#20986)
---
 docs/document/content/overview/_index.cn.md         | 20 ++++++++++----------
 docs/document/content/overview/_index.en.md         | 21 ++++++++++-----------
 .../user-manual/error-code/sql-error-code.cn.md     |  2 ++
 .../user-manual/error-code/sql-error-code.en.md     |  2 ++
 .../check/datasource/AbstractDataSourceChecker.java |  4 ++--
 ...areJobWithInvalidSourceDataSourceException.java} | 10 +++++-----
 ...PrepareJobWithTargetTableNotEmptyException.java} |  6 +++---
 ... PrepareJobWithoutEnoughPrivilegeException.java} |  8 ++++----
 .../check/datasource/MySQLDataSourceChecker.java    | 14 +++++++-------
 .../datasource/MySQLDataSourceCheckerTest.java      |  6 ++++--
 .../datasource/AbstractDataSourceCheckerTest.java   |  4 ++--
 11 files changed, 51 insertions(+), 46 deletions(-)

diff --git a/docs/document/content/overview/_index.cn.md b/docs/document/content/overview/_index.cn.md
index 697a2a7667f..2f479f846f8 100644
--- a/docs/document/content/overview/_index.cn.md
+++ b/docs/document/content/overview/_index.cn.md
@@ -9,8 +9,8 @@ chapter = true
 
 ### 介绍
 
-Apache ShardingSphere 是一款开源的分布式数据库生态项目,由 JDBC 和 Proxy 两款产品组成。
-其核心采用微内核+可插拔架构,通过插件开放扩展功能。它提供多源异构数据库增强平台,进而围绕其上层构建生态。
+Apache ShardingSphere 是一款分布式的数据库生态系统,
+可以将任意数据库转换为分布式数据库,并通过数据分片、弹性伸缩、加密等能力对原有数据库进行增强。
 
 Apache ShardingSphere 设计哲学为 Database Plus,旨在构建异构数据库上层的标准和生态。
 它关注如何充分合理地利用数据库的计算和存储能力,而并非实现一个全新的数据库。
@@ -34,14 +34,14 @@ ShardingSphere-Proxy 定位为透明化的数据库代理端,通过实现数
 
 | 特性       | 定义 |
 | --------- | ----- |
-| 数据分片   | 数据分片,是应对海量数据存储与计算的有效手段。ShardingSphere 提供基于底层数据库之上,可计算与存储水平扩展的分布式数据库解决方案。 |
-| 分布式事务 | 事务能力,是保障数据库完整、安全的关键技术,也是数据库的核心技术之一。ShardingSphere 提供在单机数据库之上的分布式事务能力,可实现跨底层数据源的数据安全。 |
-| 读写分离   | 读写分离,是应对高压力业务访问的手段之一。ShardingSphere 基于对SQL语义理解及底层数据库拓扑感知能力,提供灵活、安全的读写分离能力,且可实现读访问的负载均衡。 |
-| 高可用     | 高可用,是对数据存储计算平台的基本要求。ShardingSphere 基于无状态服务,提供高可用计算服务访问;同时可感知并利用底层数据库自身高可用实现整体的高可用能力。 |
-| 数据迁移   | 数据迁移,是打通数据生态的关键能力。SharingSphere 提供基于数据全场景的迁移能力,可应对业务数据量激增的场景。 |
-| 联邦查询   | 联邦查询,是面对复杂数据环境下利用数据的有效手段之一。ShardingSphere 提供跨数据源的复杂数据查询分析能力,简化并提升数据使用体验。 |
-| 数据加密   | 数据加密,是保证数据安全的基本手段。ShardingSphere 提供一套完整的、透明化、安全的、低改造成本的数据加密解决方案。 |
-| 影子库     | 在全链路压测场景下,ShardingSphere 通过影子库功能支持在复杂压测场景下数据隔离,压测获得测试结果可准确反应系统真实容量和性能水平。 |
+| 数据分片   | 数据分片,是应对海量数据存储与计算的有效手段。ShardingSphere 基于底层数据库提供分布式数据库解决方案,可以水平扩展计算和存储。 |
+| 分布式事务 | 事务能力,是保障数据库完整、安全的关键技术,也是数据库的核心技术。基于 XA 和 BASE 的混合事务引擎,ShardingSphere 提供在独立数据库上的分布式事务功能,保证跨数据源的数据安全。 |
+| 读写分离   | 读写分离,是应对高压力业务访问的手段。基于对 SQL 语义理解及对底层数据库拓扑感知能力,ShardingSphere 提供灵活的读写流量拆分和读流量负载均衡。 |
+| 高可用     | 高可用,是对数据存储计算平台的基本要求。ShardingSphere 提供基于原生或 Kubernetes 环境下数据库集群的分布式高可用能力。|
+| 数据迁移   | 数据迁移,是打通数据生态的关键能力。ShardingSphere 提供跨数据源的数据迁移能力,并可支持重分片扩展。 |
+| 联邦查询   | 联邦查询,是面对复杂数据环境下利用数据的有效手段。ShardingSphere 提供跨数据源的复杂查询分析能力,实现跨源的数据关联与聚合。 |
+| 数据加密   | 数据加密,是保证数据安全的基本手段。ShardingSphere 提供完整、透明、安全、低成本的数据加密解决方案。 |
+| 影子库     | 在全链路压测场景下,ShardingSphere 支持不同工作负载下的数据隔离,避免测试数据污染生产环境。 |
 
 ### 产品优势
 
diff --git a/docs/document/content/overview/_index.en.md b/docs/document/content/overview/_index.en.md
index fd74e29af41..6ab76e3822f 100644
--- a/docs/document/content/overview/_index.en.md
+++ b/docs/document/content/overview/_index.en.md
@@ -9,9 +9,8 @@ chapter = true
 
 ### Introduction
 
-Apache ShardingSphere is an open source ecosystem that allows you to transform any database into a distributed database system. 
-The project includes a JDBC and a Proxy, and its core adopts a micro-kernel and pluggable architecture.
-Thanks to its plugin-oriented architecture, features can be flexibly expanded at will.
+Apache ShardingSphere is an ecosystem to transform any database into a distributed database system,
+and enhance it with sharding, elastic scaling, encryption features & more.
 
 The project is committed to providing a multi-source heterogeneous, enhanced database platform and further building an ecosystem around the upper layer of the platform.
 Database Plus, the design philosophy of Apache ShardingSphere, aims at building the standard and ecosystem on the upper layer of the heterogeneous database.
@@ -36,14 +35,14 @@ ShardingSphere-Proxy is a transparent database proxy, providing a database serve
 
 | Feature                 | Definition |
 | ----------------------- | ---------- |
-| Data Sharding           | Data sharding is an effective way to deal with massive data storage and computing. ShardingSphere provides distributed database solutions that can scale out computing and storage levels on top of the underlying database. |
-| Distributed Transaction | Transactional capability is key to ensuring database integrity and security and is also one of the databases' core technologies. ShardingSphere provides distributed transaction capability on top of a single database, which can achieve data security across underlying data sources. |
-| Read/write Splitting    | Read/write splitting can be used to cope with business access with high stress. Based on its understanding of SQL semantics and the topological awareness of the underlying database, ShardingSphere provides flexible and secure read/write splitting capabilities and can achieve load balancing for read access. |
-| High Availability       | High availability is a basic requirement for a data storage and computing platform. ShardingSphere provides access to high-availability computing services based on stateless services. At the same time, it can sense and use the underlying database's HA solution to achieve its overall high availability. |
-| Data Migration          | Data migration is the key to connecting data ecosystems. ShardingSphere provides full-scenario data migration capability for users, which can cope with the surge of business data volume. |
-| Federated Query         | Federated queries are effective in utilizing data in a complex data environment. ShardingSphere is capable of querying and analyzing complex data across data sources, simplifying and improving the data usage experience. |
-| Data Encryption         | Data Encryption is a basic way to ensure data security. ShardingSphere provides a set of data encryption solutions that are complete, secure, transparent, and with low transformation costs. |
-| Shadow Database         | In the full-link stress testing scenario, ShardingSphere shadow DB is used for providing data isolation support for complex testing work. The obtained testing result can accurately reflect the system's true capacity and performance. |
+| Data Sharding           | Data sharding is an effective way to deal with massive data storage and computing. ShardingSphere provides a distributed database solution based on the underlying database, which can scale computing and storage horizontally. |
+| Distributed Transaction | Transactional capability is key to ensuring database integrity and security and is also one of the databases' core technologies. With a hybrid engine based on XA and BASE transactions, ShardingSphere provides distributed transaction capabilities on top of standalone databases, enabling data security across underlying data sources. |
+| Read/write Splitting    | Read/write splitting can be used to cope with business access with high stress. ShardingSphere provides flexible read/write splitting capabilities and can achieve read access load balancing based on the understanding of SQL semantics and the ability to perceive the underlying database topology. |
+| High Availability       | High availability is a basic requirement for a data storage and computing platform. Guarantee the HA of your distributed database cluster with ShardingSphere’s Operator on Kubernetes, and the native HA of your existing data sources. |
+| Data Migration          | Data migration is the key to connecting data ecosystems. SharingSphere provides migration capabilities to help users migrate the data from other data sources, while simultaneously performing data sharding. |
+| Query Federation        | Federated queries are effective in utilizing data in a complex data environment. ShardingSphere provides complex data query and analysis capabilities across data sources, simplifying the data aggregation from different data locations. |
+| Data Encryption         | Data Encryption is a basic way to ensure data security. ShardingSphere provides a complete, transparent, secure, and low-cost data encryption solution. |
+| Shadow Database         | In full-link online load testing scenarios, ShardingSphere supports data isolation in complex load testing scenarios through the shadow database function. Execute your load testing scenarios in a production environment without worrying about test data polluting your production data. |
 
 ### Advantages
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index aab47263026..c6005cf0694 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -97,6 +97,8 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | HY000     | 18082       | Sharding count of job \`%s\` is 0 |
 | HY000     | 18083       | Can not split range for table \`%s\`, reason: %s |
 | HY000     | 18084       | Target table \`%s\` is not empty |
+| 01007     | 18085       | Source data source is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges |
+| HY000     | 18086       | Source data source required \`%s = %s\`, now is \`%s\` |
 | HY000     | 18090       | Importer job write data failed |
 | HY000     | 18091       | Can not poll event because of binlog sync channel already closed |
 | HY000     | 18092       | Task \`%s\` execute failed |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 2f8c5384817..8f1442927f1 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -97,6 +97,8 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
 | HY000     | 18082       | Sharding count of job \`%s\` is 0 |
 | HY000     | 18083       | Can not split range for table \`%s\`, reason: %s |
 | HY000     | 18084       | Target table \`%s\` is not empty |
+| 01007     | 18085       | Source data source is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges |
+| HY000     | 18086       | Source data source required \`%s = %s\`, now is \`%s\` |
 | HY000     | 18090       | Importer job write data failed |
 | HY000     | 18091       | Can not poll event because of binlog sync channel already closed |
 | HY000     | 18092       | Task \`%s\` execute failed |
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
index da7c6977e3f..b5e73edddbb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceChecker.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.datasource;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.TargetTableNotEmptyWhenPrepareMigrationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
@@ -69,7 +69,7 @@ public abstract class AbstractDataSourceChecker implements DataSourceChecker {
                     Connection connection = dataSource.getConnection();
                     PreparedStatement preparedStatement = connection.prepareStatement(sql);
                     ResultSet resultSet = preparedStatement.executeQuery()) {
-                ShardingSpherePreconditions.checkState(!resultSet.next(), new TargetTableNotEmptyWhenPrepareMigrationException(each));
+                ShardingSpherePreconditions.checkState(!resultSet.next(), new PrepareJobWithTargetTableNotEmptyException(each));
             }
         }
     }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
similarity index 67%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
index e4cd1cea989..00118c9b34d 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Target table not empty when prepare migration exception.
+ * Prepare job with invalid source data source exception.
  */
-public final class TargetTableNotEmptyWhenPrepareMigrationException extends PipelineSQLException {
+public final class PrepareJobWithInvalidSourceDataSourceException extends PipelineSQLException {
     
-    private static final long serialVersionUID = -8462039913248251254L;
+    private static final long serialVersionUID = -7710035889344958565L;
     
-    public TargetTableNotEmptyWhenPrepareMigrationException(final String tableName) {
-        super(XOpenSQLState.GENERAL_ERROR, 84, "Target table `%s` is not empty before migration", tableName);
+    public PrepareJobWithInvalidSourceDataSourceException(final String dataSourceKey, final String toBeCheckedValue, final String actualValue) {
+        super(XOpenSQLState.GENERAL_ERROR, 86, "Source data source required `%s = %s`, now is `%s`", dataSourceKey, toBeCheckedValue, actualValue);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
similarity index 83%
copy from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
copy to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
index e4cd1cea989..f25d4614adb 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Target table not empty when prepare migration exception.
+ * Prepare job with target table not empty exception.
  */
-public final class TargetTableNotEmptyWhenPrepareMigrationException extends PipelineSQLException {
+public final class PrepareJobWithTargetTableNotEmptyException extends PipelineSQLException {
     
     private static final long serialVersionUID = -8462039913248251254L;
     
-    public TargetTableNotEmptyWhenPrepareMigrationException(final String tableName) {
+    public PrepareJobWithTargetTableNotEmptyException(final String tableName) {
         super(XOpenSQLState.GENERAL_ERROR, 84, "Target table `%s` is not empty before migration", tableName);
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
similarity index 76%
rename from shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
rename to shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
index e4cd1cea989..b575b5f02f7 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TargetTableNotEmptyWhenPrepareMigrationException.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
@@ -21,13 +21,13 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Target table not empty when prepare migration exception.
+ * Prepare job without enough privilege exception.
  */
-public final class TargetTableNotEmptyWhenPrepareMigrationException extends PipelineSQLException {
+public final class PrepareJobWithoutEnoughPrivilegeException extends PipelineSQLException {
     
     private static final long serialVersionUID = -8462039913248251254L;
     
-    public TargetTableNotEmptyWhenPrepareMigrationException(final String tableName) {
-        super(XOpenSQLState.GENERAL_ERROR, 84, "Target table `%s` is not empty before migration", tableName);
+    public PrepareJobWithoutEnoughPrivilegeException() {
+        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 85, "Source data source is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges");
     }
 }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index ca3ee89a1f8..33be9eb3dc8 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -19,6 +19,9 @@ package org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
 
 import org.apache.shardingsphere.data.pipeline.core.check.datasource.AbstractDataSourceChecker;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
+import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -73,7 +76,7 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
         } catch (final SQLException ex) {
             throw new PipelineJobPrepareFailedException("Source data source check privileges failed.", ex);
         }
-        throw new PipelineJobPrepareFailedException("Source data source is lack of REPLICATION SLAVE, REPLICATION CLIENT ON *.* privileges.");
+        throw new PrepareJobWithoutEnoughPrivilegeException();
     }
     
     private boolean matchPrivileges(final String privilege) {
@@ -101,12 +104,9 @@ public final class MySQLDataSourceChecker extends AbstractDataSourceChecker {
         try (PreparedStatement preparedStatement = connection.prepareStatement(SHOW_VARIABLES_SQL)) {
             preparedStatement.setString(1, key);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (!resultSet.next() && BINLOG_ROW_IMAGE.equalsIgnoreCase(key)) {
-                    return;
-                }
-                String actualValue = resultSet.getString(2);
-                if (!toBeCheckedValue.equalsIgnoreCase(actualValue)) {
-                    throw new PipelineJobPrepareFailedException(String.format("Source data source required `%s = %s`, now is `%s`", key, toBeCheckedValue, actualValue));
+                if (resultSet.next() || !BINLOG_ROW_IMAGE.equalsIgnoreCase(key)) {
+                    String actualValue = resultSet.getString(2);
+                    ShardingSpherePreconditions.checkState(toBeCheckedValue.equalsIgnoreCase(actualValue), new PrepareJobWithInvalidSourceDataSourceException(key, toBeCheckedValue, actualValue));
                 }
             }
         }
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
index 4c2d8781d76..f6da65307cf 100644
--- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceCheckerTest.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
 
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -73,7 +75,7 @@ public final class MySQLDataSourceCheckerTest {
         verify(preparedStatement).executeQuery();
     }
     
-    @Test(expected = PipelineJobPrepareFailedException.class)
+    @Test(expected = PrepareJobWithoutEnoughPrivilegeException.class)
     public void assertCheckPrivilegeLackPrivileges() {
         new MySQLDataSourceChecker().checkPrivilege(dataSources);
     }
@@ -92,7 +94,7 @@ public final class MySQLDataSourceCheckerTest {
         verify(preparedStatement, times(3)).executeQuery();
     }
     
-    @Test(expected = PipelineJobPrepareFailedException.class)
+    @Test(expected = PrepareJobWithInvalidSourceDataSourceException.class)
     public void assertCheckVariableWithWrongVariable() throws SQLException {
         when(resultSet.next()).thenReturn(true, true);
         when(resultSet.getString(2)).thenReturn("OFF", "ROW");
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
index c8cdaa03f72..01e9745fc5d 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
+++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.datasource;
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
-import org.apache.shardingsphere.data.pipeline.core.exception.job.TargetTableNotEmptyWhenPrepareMigrationException;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -101,7 +101,7 @@ public final class AbstractDataSourceCheckerTest {
         dataSourceChecker.checkTargetTable(dataSources, new TableNameSchemaNameMapping(Collections.emptyMap()), Collections.singletonList("t_order"));
     }
     
-    @Test(expected = TargetTableNotEmptyWhenPrepareMigrationException.class)
+    @Test(expected = PrepareJobWithTargetTableNotEmptyException.class)
     public void assertCheckTargetTableFailed() throws SQLException {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 1")).thenReturn(preparedStatement);