You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/24 10:03:18 UTC

[shardingsphere] branch master updated: Refactor CDC exception (#24798)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e0de8646a3 Refactor CDC exception (#24798)
8e0de8646a3 is described below

commit 8e0de8646a37cda49f63b4ee6d5f55f4ef77f93b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Mar 24 18:03:09 2023 +0800

    Refactor CDC exception (#24798)
    
    * Refactor CDC exception module
    
    * Add doc
    
    * Fix
    
    * Improve
    
    * Stop job rather throw exception
---
 .../user-manual/error-code/sql-error-code.cn.md    |  2 +
 .../user-manual/error-code/sql-error-code.en.md    |  2 +
 .../cdc/client/handler/LoginRequestHandler.java    |  4 +-
 .../connector/SocketSinkImporterConnector.java     |  7 ++-
 .../cdc/exception/CDCExceptionWrapper.java}        | 17 +++--
 .../pipeline/cdc/exception/CDCLoginException.java} | 22 +++----
 .../cdc/exception/CDCServerException.java}         | 22 +++----
 .../cdc/generator/CDCResponseGenerator.java        |  5 +-
 .../backend/handler/cdc/CDCBackendHandler.java     | 31 +++++----
 .../frontend/netty/CDCChannelInboundHandler.java   | 73 ++++++++++------------
 .../netty/CDCChannelInboundHandlerTest.java        |  6 +-
 11 files changed, 87 insertions(+), 104 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 72d55d22532..f6757461744 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -134,6 +134,8 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | HY000     | 18095       | Can not find consistency check job of \`%s\`. |
 | HY000     | 18096       | Uncompleted consistency check job \`%s\` exists. |
 | HY000     | 18200       | Not find stream data source table. |
+| HY000     | 18201       | CDC server exception, reason is: %s. |
+| HY000     | 18202       | CDC login failed, reason is: %s |
 
 ### DistSQL
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 2a289c3fd5c..e9b890fe8b0 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -134,6 +134,8 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
 | HY000     | 18095       | Can not find consistency check job of \`%s\`. |
 | HY000     | 18096       | Uncompleted consistency check job \`%s\` exists. |
 | HY000     | 18200       | Not find stream data source table. |
+| HY000     | 18201       | CDC server exception, reason is: %s. |
+| HY000     | 18202       | CDC login failed, reason is: %s |
 
 ### DistSQL
 
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 4f06830a576..af0c96e1c63 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -89,11 +89,11 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
     
     private void sendStreamDataEvent(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
         if (response.getStatus() == Status.SUCCEED) {
-            log.info("login success, username {}", username);
+            log.info("Login success, username {}", username);
             connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
             ctx.fireUserEventTriggered(new StreamDataEvent());
         } else {
-            log.error("login failed, username {}", username);
+            log.error("Login failed, username: {}, error message: {}", username, response.getErrorMessage());
         }
     }
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 7b0099e788f..b36191711e0 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -57,7 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * Socket sink importer connector.
  */
 @Slf4j
-public final class SocketSinkImporterConnector implements ImporterConnector {
+public final class SocketSinkImporterConnector implements ImporterConnector, AutoCloseable {
     
     private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
     
@@ -205,6 +205,11 @@ public final class SocketSinkImporterConnector implements ImporterConnector {
         return CDCSinkType.SOCKET.name();
     }
     
+    @Override
+    public void close() throws Exception {
+        channel.close();
+    }
+    
     @RequiredArgsConstructor
     private final class CDCIncrementalImporterTask implements Runnable {
         
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
similarity index 72%
copy from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
index 2498f351d90..e4064c9b52c 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
@@ -15,23 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
 
 /**
- * CDC response error code.
+ * CDC exception wrapper.
  */
 @RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+@Getter
+public final class CDCExceptionWrapper extends RuntimeException {
     
-    SERVER_ERROR("1"),
+    private final String requestId;
     
-    ILLEGAL_REQUEST_ERROR("2"),
-    
-    ILLEGAL_USERNAME_OR_PASSWORD("3");
-    
-    @Getter
-    private final String code;
+    private final ShardingSphereSQLException exception;
 }
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
similarity index 62%
copy from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
index 2498f351d90..fda62711db5 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
@@ -15,23 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * CDC response error code.
+ * CDC login exception.
  */
-@RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+public final class CDCLoginException extends PipelineSQLException {
     
-    SERVER_ERROR("1"),
-    
-    ILLEGAL_REQUEST_ERROR("2"),
-    
-    ILLEGAL_USERNAME_OR_PASSWORD("3");
-    
-    @Getter
-    private final String code;
+    public CDCLoginException(final String reason) {
+        super(XOpenSQLState.GENERAL_ERROR, 202, String.format("CDC login failed, reason is: %s", reason));
+    }
 }
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
similarity index 61%
rename from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
index 2498f351d90..11626806f1b 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
@@ -15,23 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * CDC response error code.
+ * CDC server exception.
  */
-@RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+public final class CDCServerException extends PipelineSQLException {
     
-    SERVER_ERROR("1"),
-    
-    ILLEGAL_REQUEST_ERROR("2"),
-    
-    ILLEGAL_USERNAME_OR_PASSWORD("3");
-    
-    @Getter
-    private final String code;
+    public CDCServerException(final String reason) {
+        super(XOpenSQLState.GENERAL_ERROR, 201, String.format("CDC server exception, reason is: %s.", reason));
+    }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
index dc24261e26b..3466bd5e3b5 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.generator;
 
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -45,7 +44,7 @@ public final class CDCResponseGenerator {
      * @param errorMessage error message
      * @return failed response
      */
-    public static CDCResponse failed(final String requestId, final CDCResponseErrorCode errorCode, final String errorMessage) {
-        return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode.getCode()).setErrorMessage(errorMessage).build();
+    public static CDCResponse failed(final String requestId, final String errorCode, final String errorMessage) {
+        return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
     }
 }
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 0e513182ae6..07088a2818a 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -23,13 +23,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
@@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtil;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtil;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -58,7 +60,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -83,7 +84,7 @@ public final class CDCBackendHandler {
     public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
         ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
         if (null == database) {
-            return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", requestBody.getDatabase()));
+            throw new CDCExceptionWrapper(requestId, new CDCServerException(String.format("%s database is not exists", requestBody.getDatabase())));
         }
         Map<String, Set<String>> schemaTableNameMap;
         Collection<String> tableNames;
@@ -99,22 +100,19 @@ public final class CDCBackendHandler {
             tableNames = schemaTableNames;
         }
         if (tableNames.isEmpty()) {
-            throw new NotFindStreamDataSourceTableException();
-        }
-        Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
-        if (!shardingRule.isPresent()) {
-            return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+            throw new CDCExceptionWrapper(requestId, new NotFindStreamDataSourceTableException());
         }
+        ShardingRule shardingRule = database.getRuleMetaData().getSingleRule(ShardingRule.class);
         Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
         // TODO need support case-insensitive later
         for (String each : tableNames) {
-            actualDataNodesMap.put(each, CDCTableRuleUtil.getActualDataNodes(shardingRule.get(), each));
+            actualDataNodesMap.put(each, CDCTableRuleUtil.getActualDataNodes(shardingRule, each));
         }
         boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
         String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
         connectionContext.setJobId(jobId);
-        startStreaming(requestId, jobId, connectionContext, channel);
+        startStreaming(jobId, connectionContext, channel);
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
@@ -131,17 +129,17 @@ public final class CDCBackendHandler {
     /**
      * Start streaming.
      *
-     * @param requestId request id
      * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
-     * @return CDC response
      */
-    // TODO not return CDCResponse
-    public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+    public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
+            throw new PipelineJobNotFoundException(jobId);
+        }
+        if (PipelineJobCenter.isJobExisting(jobId)) {
+            PipelineJobCenter.stop(jobId);
         }
         JobConfigurationPOJO jobConfigPOJO = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
         // TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
@@ -152,12 +150,11 @@ public final class CDCBackendHandler {
                 ? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
         CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, database, cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
-        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+        PipelineJobCenter.addJob(jobId, job);
         OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
         oneOffJobBootstrap.execute();
         connectionContext.setJobId(jobId);
-        return CDCResponseGenerator.succeedBuilder(requestId).build();
     }
     
     /**
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index bad92ede984..f48f2bfd42d 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -26,9 +26,11 @@ import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -39,6 +41,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamin
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
 import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException;
@@ -46,6 +49,7 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
 
@@ -90,12 +94,12 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
         // TODO add CDC exception to wrapper this exception, and add the parameters requestId and whether to close connect
         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         ChannelFuture channelFuture;
-        if (cause instanceof ShardingSphereSQLException) {
-            SQLException sqlException = ((ShardingSphereSQLException) cause).toSQLException();
-            String errorMessage = String.format("ERROR %s (%s): %s", sqlException.getErrorCode(), sqlException.getSQLState(), sqlException.getMessage());
-            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", CDCResponseErrorCode.SERVER_ERROR, errorMessage));
+        if (cause instanceof CDCExceptionWrapper) {
+            CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
+            ShardingSphereSQLException exception = wrapper.getException();
+            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage()));
         } else {
-            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", CDCResponseErrorCode.SERVER_ERROR, cause.getMessage()));
+            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), cause.getMessage()));
         }
         if (CDCConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus()) {
             channelFuture.addListener(ChannelFutureListener.CLOSE);
@@ -116,7 +120,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
                 processStreamDataRequest(ctx, request, connectionContext);
                 break;
             case ACK_STREAMING:
-                processAckStreamingRequest(ctx, request);
+                processAckStreamingRequest(request);
                 break;
             case STOP_STREAMING:
                 processStopStreamingRequest(ctx, request, connectionContext);
@@ -134,8 +138,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     
     private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasLoginRequestBody() || !request.getLoginRequestBody().hasBasicBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Login request body is empty"));
         }
         BasicBody body = request.getLoginRequestBody().getBasicBody();
         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
@@ -145,17 +148,17 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
             connectionContext.setCurrentUser(user.get());
             ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
         } else {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD, "Illegal username or password"))
-                    .addListener(ChannelFutureListener.CLOSE);
+            throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginException("Illegal username or password"));
         }
     }
     
-    private void checkPrivileges(final Grantee grantee, final String currentDatabase) {
+    private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
-                .orElseThrow(() -> new MissingRequiredRuleException("authority"));
+                .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
         ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
-                .orElseThrow(() -> new SQLAuditException(String.format("Access denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname())));
-        ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase), () -> new SQLAuditException(String.format("Unknown database '%s'", currentDatabase)));
+                .orElseThrow(() -> new CDCExceptionWrapper(requestId, new SQLAuditException(String.format("Access denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname()))));
+        ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
+                () -> new CDCExceptionWrapper(requestId, new SQLAuditException(String.format("Unknown database '%s'", currentDatabase))));
     }
     
     private String getHostAddress(final ChannelHandlerContext context) {
@@ -165,59 +168,49 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     
     private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStreamDataRequestBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
         }
         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
         if (requestBody.getDatabase().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
         }
         if (requestBody.getSourceSchemaTableList().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
         }
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
+        checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
         CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
     }
     
-    private void processAckStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request) {
+    private void processAckStreamingRequest(final CDCRequest request) {
         if (!request.hasAckStreamingRequestBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request body")).addListener(ChannelFutureListener.CLOSE);
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
         }
         AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
         if (requestBody.getAckId().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
         }
         backendHandler.processAck(requestBody);
     }
     
     private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStartStreamingRequestBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start streaming request body"))
-                    .addListener(ChannelFutureListener.CLOSE);
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
         }
         StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
-        // TODO improve after cdc exception refactor
         if (requestBody.getStreamingId().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start streaming request parameter"))
-                    .addListener(ChannelFutureListener.CLOSE);
-            return;
+            throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
         }
         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
-        CDCResponse response = backendHandler.startStreaming(request.getRequestId(), requestBody.getStreamingId(), connectionContext, ctx.channel());
-        ctx.writeAndFlush(response);
+        checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
+        backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
+        ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
     private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+        checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
         backendHandler.stopStreaming(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
         connectionContext.setJobId(null);
@@ -227,14 +220,14 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
     private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+        checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
         try {
             backendHandler.dropStreaming(connectionContext.getJobId());
             connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
             connectionContext.setJobId(null);
             ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
         } catch (final SQLException ex) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
+            throw new CDCExceptionWrapper(request.getRequestId(), new CDCServerException(ex.getMessage()));
         }
     }
 }
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 69c5a284d06..a71dd514145 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -21,7 +21,6 @@ import com.google.common.hash.Hashing;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.logging.LoggingHandler;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
@@ -31,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -88,7 +88,7 @@ public final class CDCChannelInboundHandlerTest {
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
         CDCResponse expectedLoginResult = channel.readOutbound();
         assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD.getCode()));
+        assertThat(expectedLoginResult.getErrorCode(), is(XOpenSQLState.GENERAL_ERROR.getValue()));
         assertFalse(channel.isOpen());
     }
     
@@ -100,7 +100,7 @@ public final class CDCChannelInboundHandlerTest {
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
         CDCResponse expectedLoginResult = channel.readOutbound();
         assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR.getCode()));
+        assertThat(expectedLoginResult.getErrorCode(), is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
         assertFalse(channel.isOpen());
     }