You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by "sandynz (via GitHub)" <gi...@apache.org> on 2023/02/16 12:42:15 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #24194: Improve CDC protocol

sandynz commented on code in PR #24194:
URL: https://github.com/apache/shardingsphere/pull/24194#discussion_r1108416678


##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto:
##########
@@ -60,38 +60,28 @@ message LoginRequest {
   }
 }
 
-message CreateSubscriptionRequest {
+message StreamDataRequestBody {
   string database = 1;
-  message TableName {
+  message SchemaTable {
     string schema = 1;
-    string name = 2;
+    string table = 2;
   }
-  repeated TableName table_names = 2;
-  string subscription_name = 3;
-  enum SubscriptionMode {
-    UNKNOWN = 0;
-    INCREMENTAL = 1;
-    FULL = 2;
-  }
-  SubscriptionMode subscription_mode = 4;
-  bool incremental_global_orderly = 5;
+  repeated SchemaTable source_schema_tables = 2;
+  bool full = 3;
 }
 
-message StartSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message AckStreamingRequestBody {
+  string ack_id = 3;
 }
 
-message StopSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message StopStreamingRequestBody {
+  string streaming_id = 2;
 }
 
-message DropSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message StartStreamingRequestBody {
+  string streaming_id = 2;
 }
 
-message AckRequest {
-  string ack_id = 3;
+message DropStreamingRequestBody {
+  string streaming_id = 2;
 }

Review Comment:
   `streaming_id = 2` could be `streaming_id = 1` in stop/start/drop request body



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto:
##########
@@ -60,38 +60,28 @@ message LoginRequest {
   }
 }
 
-message CreateSubscriptionRequest {
+message StreamDataRequestBody {
   string database = 1;
-  message TableName {
+  message SchemaTable {
     string schema = 1;
-    string name = 2;
+    string table = 2;
   }
-  repeated TableName table_names = 2;
-  string subscription_name = 3;
-  enum SubscriptionMode {
-    UNKNOWN = 0;
-    INCREMENTAL = 1;
-    FULL = 2;
-  }
-  SubscriptionMode subscription_mode = 4;
-  bool incremental_global_orderly = 5;
+  repeated SchemaTable source_schema_tables = 2;
+  bool full = 3;
 }
 
-message StartSubscriptionRequest {
-  string database = 1;
-  string subscription_name = 2;
+message AckStreamingRequestBody {
+  string ack_id = 3;

Review Comment:
   `ack_id = 3` could be `ack_id = 1`



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -67,39 +64,38 @@ public final class CDCBackendHandler {
     private final CDCJobAPI jobAPI = new CDCJobAPI();
     
     /**
-     * Create subscription.
+     * Stream data.
      *
-     * @param request CDC request
+     * @param requestId request id
+     * @param streamDataRequestBody stream data request body
+     * @param connectionContext connection context
+     * @param channel channel
      * @return CDC response
      */
-    public CDCResponse createSubscription(final CDCRequest request) {
-        CreateSubscriptionRequest createSubscription = request.getCreateSubscription();
-        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(createSubscription.getDatabase());
+    public CDCResponse streamData(final String requestId, final StreamDataRequestBody streamDataRequestBody, final CDCConnectionContext connectionContext, final Channel channel) {

Review Comment:
   `streamDataRequestBody` could be `requestBody`



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -173,7 +177,7 @@ public void dropSubscription(final String jobId) throws SQLException {
      *
      * @param ackRequest ack request
      */
-    public void processAck(final AckRequest ackRequest) {
+    public void processAck(final AckStreamingRequestBody ackRequest) {

Review Comment:
   `ackRequest` could be `requestBody`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org