You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "azexcy (via GitHub)" <gi...@apache.org> on 2023/02/16 09:32:08 UTC

[GitHub] [shardingsphere] azexcy commented on a diff in pull request #24194: Improve CDC protocol

azexcy commented on code in PR #24194:
URL: https://github.com/apache/shardingsphere/pull/24194#discussion_r1108224217


##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -164,69 +163,79 @@ private String getHostAddress(final ChannelHandlerContext context) {
         return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
     }
     
-    private void processCreateSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        if (!request.hasCreateSubscription()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request body"))
-                    .addListener(ChannelFutureListener.CLOSE);
+    private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        if (!request.hasStreamDataRequestBody()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss stream data request body"));
+            return;
+        }
+        StreamDataRequestBody streamDataRequestBody = request.getStreamDataRequestBody();
+        if (streamDataRequestBody.getDatabase().isEmpty()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
             return;
         }
-        CreateSubscriptionRequest createSubscriptionRequest = request.getCreateSubscription();
-        if (createSubscriptionRequest.getTableNamesList().isEmpty() || createSubscriptionRequest.getDatabase().isEmpty() || createSubscriptionRequest.getSubscriptionName().isEmpty()
-                || createSubscriptionRequest.getSubscriptionMode() == SubscriptionMode.UNKNOWN) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription request parameter"));
+        // TODO need support the all tables at database or schema
+        if (streamDataRequestBody.getSourceSchemaTablesList().isEmpty()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
             return;
         }
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), createSubscriptionRequest.getDatabase());
-        CDCResponse response = backendHandler.createSubscription(request);
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), streamDataRequestBody.getDatabase());
+        CDCResponse response = backendHandler.streamData(request.getRequestId(), streamDataRequestBody, connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
     }
     
-    private void processStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        if (!request.hasStartSubscription()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start subscription request body"))
+    private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        if (!request.hasStartStreamingRequestBody()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss start streaming request body"))
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        StartSubscriptionRequest startSubscriptionRequest = request.getStartSubscription();
-        if (startSubscriptionRequest.getDatabase().isEmpty() || startSubscriptionRequest.getSubscriptionName().isEmpty()) {
-            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request parameter"))
+        StartStreamingRequestBody startStreamingRequestBody = request.getStartStreamingRequestBody();
+        // TODO improve after cdc exception refactor
+        if (startStreamingRequestBody.getStreamingId().isEmpty()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start streaming request parameter"))
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), startSubscriptionRequest.getDatabase());
-        CDCResponse response = backendHandler.startSubscription(request, ctx.channel(), connectionContext);
+        String database = backendHandler.getDatabaseByJobId(startStreamingRequestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+        CDCResponse response = backendHandler.startStreaming(request.getRequestId(), startStreamingRequestBody.getStreamingId(), connectionContext, ctx.channel());
         ctx.writeAndFlush(response);
     }
     
-    private void processStopSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        StopSubscriptionRequest stopSubscriptionRequest = request.getStopSubscription();
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), stopSubscriptionRequest.getDatabase());
-        backendHandler.stopSubscription(connectionContext.getJobId());
+    private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        StopStreamingRequestBody stopStreamingRequestBody = request.getStopStreamingRequestBody();
+        String database = backendHandler.getDatabaseByJobId(stopStreamingRequestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
+        backendHandler.stopStreaming(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+        connectionContext.setJobId(null);
         ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void processDropSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
-        DropSubscriptionRequest dropSubscriptionRequest = request.getDropSubscription();
-        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), dropSubscriptionRequest.getDatabase());
+    private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        DropStreamingRequestBody dropStreamingRequestBody = request.getDropStreamingRequestBody();
+        String database = backendHandler.getDatabaseByJobId(dropStreamingRequestBody.getStreamingId());
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), database);
         try {
-            backendHandler.dropSubscription(connectionContext.getJobId());
+            backendHandler.dropStreaming(connectionContext.getJobId());
+            connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);

Review Comment:
   Remove `STREAMING`, this doesn't seem to be needed at the moment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org