You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/03/24 06:19:24 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #24798: Improve

azexcy opened a new pull request, #24798:
URL: https://github.com/apache/shardingsphere/pull/24798

   Relate #22500.
   
   Changes proposed in this pull request:
     - Refactor CDC exception module
     - Add more exceptions
     - Update error code doc.
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #24798: Refactor CDC exception module

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz merged PR #24798:
URL: https://github.com/apache/shardingsphere/pull/24798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24798: Refactor CDC exception module

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24798:
URL: https://github.com/apache/shardingsphere/pull/24798#discussion_r1147189499


##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -131,17 +130,17 @@ public String getDatabaseNameByJobId(final String jobId) {
     /**
      * Start streaming.
      *
-     * @param requestId request id
      * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
-     * @return CDC response
      */
-    // TODO not return CDCResponse
-    public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+    public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
+            throw new PipelineJobNotFoundException(jobId);
+        }
+        if (PipelineJobCenter.isJobExisting(jobId)) {
+            throw new PipelineJobHasAlreadyStartedException(jobId);
         }

Review Comment:
   If existing job is stopping, then we could not just throw exception



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -165,59 +168,49 @@ private String getHostAddress(final ChannelHandlerContext context) {
     
     private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStreamDataRequestBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("stream data request body is empty"));
         }
         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
         if (requestBody.getDatabase().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("database is empty"));
         }
         if (requestBody.getSourceSchemaTableList().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("stream data request is empty"));
         }

Review Comment:
   Error message starting letter could be uppercase, and also other ones



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -165,59 +168,49 @@ private String getHostAddress(final ChannelHandlerContext context) {
     
     private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasStreamDataRequestBody()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("stream data request body is empty"));
         }
         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
         if (requestBody.getDatabase().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("database is empty"));
         }
         if (requestBody.getSourceSchemaTableList().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
-            return;
+            throw CDCExceptionWrapper.wrapper(request.getRequestId(), new PipelineInvalidParameterException("stream data request is empty"));
         }

Review Comment:
   Error message doesn't match the if condition block



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
+
+/**
+ * CDC exception wrapper.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCExceptionWrapper extends RuntimeException {
+    
+    private final String requestId;
+    
+    private final ShardingSphereSQLException exception;
+    
+    /**
+     * Wrapper the exception.
+     *
+     * @param requestId request id
+     * @param exception ShardingSphereCDCException
+     * @return Wrapped ShardingSphereCDCException
+     */
+    public static CDCExceptionWrapper wrapper(final String requestId, final ShardingSphereSQLException exception) {
+        return new CDCExceptionWrapper(requestId, exception);
+    }

Review Comment:
   `wrapper` method is not required, we could use constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24798: Refactor CDC exception module

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24798:
URL: https://github.com/apache/shardingsphere/pull/24798#discussion_r1147245620


##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -131,17 +130,17 @@ public String getDatabaseNameByJobId(final String jobId) {
     /**
      * Start streaming.
      *
-     * @param requestId request id
      * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
-     * @return CDC response
      */
-    // TODO not return CDCResponse
-    public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+    public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
+            throw new PipelineJobNotFoundException(jobId);
+        }
+        if (PipelineJobCenter.isJobExisting(jobId)) {
+            throw new PipelineJobHasAlreadyStartedException(jobId);
         }

Review Comment:
   It will be improve in another PR. Since it's common requirement to wait job stopping synchronously



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24798: Refactor CDC exception module

Posted by "sandynz (via GitHub)" <gi...@apache.org>.
sandynz commented on code in PR #24798:
URL: https://github.com/apache/shardingsphere/pull/24798#discussion_r1147245620


##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -131,17 +130,17 @@ public String getDatabaseNameByJobId(final String jobId) {
     /**
      * Start streaming.
      *
-     * @param requestId request id
      * @param jobId job id
      * @param channel channel
      * @param connectionContext connection context
-     * @return CDC response
      */
-    // TODO not return CDCResponse
-    public CDCResponse startStreaming(final String requestId, final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
+    public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
         CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
         if (null == cdcJobConfig) {
-            return CDCResponseGenerator.failed(jobId, CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config doesn't exist", jobId));
+            throw new PipelineJobNotFoundException(jobId);
+        }
+        if (PipelineJobCenter.isJobExisting(jobId)) {
+            throw new PipelineJobHasAlreadyStartedException(jobId);
         }

Review Comment:
   Need to stop and then start it.
   
   And it will be improved in another PR. Since it's common requirement to wait job stopping synchronously.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org