You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/24 10:03:18 UTC
[shardingsphere] branch master updated: Refactor CDC exception (#24798)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8e0de8646a3 Refactor CDC exception (#24798)
8e0de8646a3 is described below
commit 8e0de8646a37cda49f63b4ee6d5f55f4ef77f93b
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Mar 24 18:03:09 2023 +0800
Refactor CDC exception (#24798)
* Refactor CDC exception module
* Add doc
* Fix
* Improve
* Stop job rather throw exception
---
.../user-manual/error-code/sql-error-code.cn.md | 2 +
.../user-manual/error-code/sql-error-code.en.md | 2 +
.../cdc/client/handler/LoginRequestHandler.java | 4 +-
.../connector/SocketSinkImporterConnector.java | 7 ++-
.../cdc/exception/CDCExceptionWrapper.java} | 17 +++--
.../pipeline/cdc/exception/CDCLoginException.java} | 22 +++----
.../cdc/exception/CDCServerException.java} | 22 +++----
.../cdc/generator/CDCResponseGenerator.java | 5 +-
.../backend/handler/cdc/CDCBackendHandler.java | 31 +++++----
.../frontend/netty/CDCChannelInboundHandler.java | 73 ++++++++++------------
.../netty/CDCChannelInboundHandlerTest.java | 6 +-
11 files changed, 87 insertions(+), 104 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 72d55d22532..f6757461744 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -134,6 +134,8 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| HY000 | 18095 | Can not find consistency check job of \`%s\`. |
| HY000 | 18096 | Uncompleted consistency check job \`%s\` exists. |
| HY000 | 18200 | Not find stream data source table. |
+| HY000 | 18201 | CDC server exception, reason is: %s. |
+| HY000 | 18202 | CDC login failed, reason is: %s |
### DistSQL
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 2a289c3fd5c..e9b890fe8b0 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -134,6 +134,8 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
| HY000 | 18095 | Can not find consistency check job of \`%s\`. |
| HY000 | 18096 | Uncompleted consistency check job \`%s\` exists. |
| HY000 | 18200 | Not find stream data source table. |
+| HY000 | 18201 | CDC server exception, reason is: %s. |
+| HY000 | 18202 | CDC login failed, reason is: %s |
### DistSQL
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 4f06830a576..af0c96e1c63 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -89,11 +89,11 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
private void sendStreamDataEvent(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
if (response.getStatus() == Status.SUCCEED) {
- log.info("login success, username {}", username);
+ log.info("Login success, username {}", username);
connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
ctx.fireUserEventTriggered(new StreamDataEvent());
} else {
- log.error("login failed, username {}", username);
+ log.error("Login failed, username: {}, error message: {}", username, response.getErrorMessage());
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 7b0099e788f..b36191711e0 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -57,7 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
* Socket sink importer connector.
*/
@Slf4j
-public final class SocketSinkImporterConnector implements ImporterConnector {
+public final class SocketSinkImporterConnector implements ImporterConnector, AutoCloseable {
private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
@@ -205,6 +205,11 @@ public final class SocketSinkImporterConnector implements ImporterConnector {
return CDCSinkType.SOCKET.name();
}
+ @Override
+ public void close() throws Exception {
+ channel.close();
+ }
+
@RequiredArgsConstructor
private final class CDCIncrementalImporterTask implements Runnable {
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
similarity index 72%
copy from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
index 2498f351d90..e4064c9b52c 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
@@ -15,23 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
/**
- * CDC response error code.
+ * CDC exception wrapper.
*/
@RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+@Getter
+public final class CDCExceptionWrapper extends RuntimeException {
- SERVER_ERROR("1"),
+ private final String requestId;
- ILLEGAL_REQUEST_ERROR("2"),
-
- ILLEGAL_USERNAME_OR_PASSWORD("3");
-
- @Getter
- private final String code;
+ private final ShardingSphereSQLException exception;
}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
similarity index 62%
copy from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
copy to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
index 2498f351d90..fda62711db5 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
@@ -15,23 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC response error code.
+ * CDC login exception.
*/
-@RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+public final class CDCLoginException extends PipelineSQLException {
- SERVER_ERROR("1"),
-
- ILLEGAL_REQUEST_ERROR("2"),
-
- ILLEGAL_USERNAME_OR_PASSWORD("3");
-
- @Getter
- private final String code;
+ public CDCLoginException(final String reason) {
+ super(XOpenSQLState.GENERAL_ERROR, 202, String.format("CDC login failed, reason is: %s", reason));
+ }
}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
similarity index 61%
rename from kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
rename to kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
index 2498f351d90..11626806f1b 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
@@ -15,23 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.common;
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC response error code.
+ * CDC server exception.
*/
-@RequiredArgsConstructor
-public enum CDCResponseErrorCode {
+public final class CDCServerException extends PipelineSQLException {
- SERVER_ERROR("1"),
-
- ILLEGAL_REQUEST_ERROR("2"),
-
- ILLEGAL_USERNAME_OR_PASSWORD("3");
-
- @Getter
- private final String code;
+ public CDCServerException(final String reason) {
+ super(XOpenSQLState.GENERAL_ERROR, 201, String.format("CDC server exception, reason is: %s.", reason));
+ }
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
index dc24261e26b..3466bd5e3b5 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.generator;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -45,7 +44,7 @@ public final class CDCResponseGenerator {
* @param errorMessage error message
* @return failed response
*/
- public static CDCResponse failed(final String requestId, final CDCResponseErrorCode errorCode, final String errorMessage) {
- return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode.getCode()).setErrorMessage(errorMessage).build();
+ public static CDCResponse failed(final String requestId, final String errorCode, final String errorMessage) {
+ return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode).setErrorMessage(errorMessage).build();
}
}
diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 0e513182ae6..07088a2818a 100644
--- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -23,13 +23,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
@@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtil;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtil;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -58,7 +60,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -83,7 +84,7 @@ public final class CDCBackendHandler {
public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
if (null == database) {
- return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", requestBody.getDatabase()));
+ throw new CDCExceptionWrapper(requestId, new CDCServerException(String.format("%s database is not exists", requestBody.getDatabase())));
}
Map<String, Set<String>> schemaTableNameMap;
Collection<String> tableNames;
@@ -99,22 +100,19 @@ public final class CDCBackendHandler {
tableNames = schemaTableNames;
}
if (tableNames.isEmpty()) {
- throw new NotFindStreamDataSourceTableException();
- }
- Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
- if (!shardingRule.isPresent()) {
- return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+ throw new CDCExceptionWrapper(requestId, new NotFindStreamDataSourceTableException());
}
+ ShardingRule shardingRule = database.getRuleMetaData().getSingleRule(ShardingRule.class);
Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
// TODO need support case-insensitive later
for (String each : tableNames) {
- actualDataNodesMap.put(each, CDCTableRuleUtil.getActualDataNodes(shardingRule.get(), each));
+ actualDataNodesMap.put(each, CDCTableRuleUtil.getActualDataNodes(shardingRule, each));
}
boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
connectionContext.setJobId(jobId);
- startStreaming(requestId, jobId, connectionContext, channel);
+ startStreaming(jobId, connectionContext, channel);
return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
@@ -131,17 +129,17 @@ public final class CDCBackendHandler {
/**
* Start streaming.
*
- * @param requestId request id
* @param jobId job id
* @param channel channel
* @param connectionContext connection context
- * @return CDC response
*/
- // TODO not return CDCResponse
- public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
if (null == cdcJobConfig) {
- return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
+ throw new PipelineJobNotFoundException(jobId);
+ }
+ if (PipelineJobCenter.isJobExisting(jobId)) {
+ PipelineJobCenter.stop(jobId);
}
JobConfigurationPOJO jobConfigPOJO = PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
// TODO, ensure that there is only one consumer at a time, job config disable may not be updated when the program is forced to close
@@ -152,12 +150,11 @@ public final class CDCBackendHandler {
? DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, database, cdcJobConfig.getJobShardingCount(), cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
- PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
+ PipelineJobCenter.addJob(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
oneOffJobBootstrap.execute();
connectionContext.setJobId(jobId);
- return CDCResponseGenerator.succeedBuilder(requestId).build();
}
/**
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index bad92ede984..f48f2bfd42d 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -26,9 +26,11 @@ import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
@@ -39,6 +41,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamin
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
import org.apache.shardingsphere.infra.executor.audit.exception.SQLAuditException;
@@ -46,6 +49,7 @@ import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
@@ -90,12 +94,12 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
// TODO add CDC exception to wrapper this exception, and add the parameters requestId and whether to close connect
CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
ChannelFuture channelFuture;
- if (cause instanceof ShardingSphereSQLException) {
- SQLException sqlException = ((ShardingSphereSQLException) cause).toSQLException();
- String errorMessage = String.format("ERROR %s (%s): %s", sqlException.getErrorCode(), sqlException.getSQLState(), sqlException.getMessage());
- channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", CDCResponseErrorCode.SERVER_ERROR, errorMessage));
+ if (cause instanceof CDCExceptionWrapper) {
+ CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
+ ShardingSphereSQLException exception = wrapper.getException();
+ channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage()));
} else {
- channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", CDCResponseErrorCode.SERVER_ERROR, cause.getMessage()));
+ channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), cause.getMessage()));
}
if (CDCConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus()) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
@@ -116,7 +120,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
processStreamDataRequest(ctx, request, connectionContext);
break;
case ACK_STREAMING:
- processAckStreamingRequest(ctx, request);
+ processAckStreamingRequest(request);
break;
case STOP_STREAMING:
processStopStreamingRequest(ctx, request, connectionContext);
@@ -134,8 +138,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
if (!request.hasLoginRequestBody() || !request.getLoginRequestBody().hasBasicBody()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Login request body is empty"));
}
BasicBody body = request.getLoginRequestBody().getBasicBody();
AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
@@ -145,17 +148,17 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
connectionContext.setCurrentUser(user.get());
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
} else {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD, "Illegal username or password"))
- .addListener(ChannelFutureListener.CLOSE);
+ throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginException("Illegal username or password"));
}
}
- private void checkPrivileges(final Grantee grantee, final String currentDatabase) {
+ private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
- .orElseThrow(() -> new MissingRequiredRuleException("authority"));
+ .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
- .orElseThrow(() -> new SQLAuditException(String.format("Access denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname())));
- ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase), () -> new SQLAuditException(String.format("Unknown database '%s'", currentDatabase)));
+ .orElseThrow(() -> new CDCExceptionWrapper(requestId, new SQLAuditException(String.format("Access denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname()))));
+ ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
+ () -> new CDCExceptionWrapper(requestId, new SQLAuditException(String.format("Unknown database '%s'", currentDatabase))));
}
private String getHostAddress(final ChannelHandlerContext context) {
@@ -165,59 +168,49 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
if (!request.hasStreamDataRequestBody()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
}
StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
if (requestBody.getDatabase().isEmpty()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
}
if (requestBody.getSourceSchemaTableList().isEmpty()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
}
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
+ checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
ctx.writeAndFlush(response);
}
- private void processAckStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request) {
+ private void processAckStreamingRequest(final CDCRequest request) {
if (!request.hasAckStreamingRequestBody()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request body")).addListener(ChannelFutureListener.CLOSE);
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
}
AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
if (requestBody.getAckId().isEmpty()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
}
backendHandler.processAck(requestBody);
}
private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
if (!request.hasStartStreamingRequestBody()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start streaming request body"))
- .addListener(ChannelFutureListener.CLOSE);
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
}
StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
- // TODO improve after cdc exception refactor
if (requestBody.getStreamingId().isEmpty()) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start streaming request parameter"))
- .addListener(ChannelFutureListener.CLOSE);
- return;
+ throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
}
String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
- CDCResponse response = backendHandler.startStreaming(request.getRequestId(), requestBody.getStreamingId(), connectionContext, ctx.channel());
- ctx.writeAndFlush(response);
+ checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
+ backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+ checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
backendHandler.stopStreaming(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
connectionContext.setJobId(null);
@@ -227,14 +220,14 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
- checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+ checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
try {
backendHandler.dropStreaming(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
} catch (final SQLException ex) {
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
+ throw new CDCExceptionWrapper(request.getRequestId(), new CDCServerException(ex.getMessage()));
}
}
}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 69c5a284d06..a71dd514145 100644
--- a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -21,7 +21,6 @@ import com.google.common.hash.Hashing;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
-import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
@@ -31,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
@@ -88,7 +88,7 @@ public final class CDCChannelInboundHandlerTest {
assertTrue(expectedGreetingResult.hasServerGreetingResult());
CDCResponse expectedLoginResult = channel.readOutbound();
assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
- assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD.getCode()));
+ assertThat(expectedLoginResult.getErrorCode(), is(XOpenSQLState.GENERAL_ERROR.getValue()));
assertFalse(channel.isOpen());
}
@@ -100,7 +100,7 @@ public final class CDCChannelInboundHandlerTest {
assertTrue(expectedGreetingResult.hasServerGreetingResult());
CDCResponse expectedLoginResult = channel.readOutbound();
assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
- assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR.getCode()));
+ assertThat(expectedLoginResult.getErrorCode(), is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
assertFalse(channel.isOpen());
}