You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/12/02 06:33:17 UTC

[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22579: Add CDC rule and login implemention

sandynz commented on code in PR #22579:
URL: https://github.com/apache/shardingsphere/pull/22579#discussion_r1037750361


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/Bootstrap.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+
+/**
+ * Client bootstrap.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class Bootstrap {

Review Comment:
   It's better to put `Bootstrap` to `client` package, since there's several `Bootstrap` in modules.
   
   Or could we just merge `Bootstrap` into `CDCClient`?



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/Bootstrap.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+
+/**
+ * Client bootstrap.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class Bootstrap {
+    
+    /**
+     * Main entrance.
+     *
+     * @param args args
+     */
+    public static void main(final String[] args) {
+        int port = args.length > 0 ? Integer.parseInt(args[0]) : 33071;
+        String address = args.length > 1 ? args[1] : "127.0.0.1";
+        new CDCClient().start(port, address);

Review Comment:
   It's better to use `address` then `port` ordering in arguments



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+
+/**
+ * CDC response message generator.
+ */
+public final class CDCResponseGenerator {
+    
+    /**
+     * Succeed response builder.
+     *
+     * @param requestId request id
+     * @return success message
+     */
+    public static Builder succeedBuilder(final String requestId) {
+        Builder builder = CDCResponse.newBuilder();
+        builder.setStatus(Status.SUCCEED);
+        builder.setRequestId(requestId);
+        return builder;

Review Comment:
   `builder` should be `result`



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC order.

Review Comment:
   It's better to add more detailed javadoc, don't know the purpose of this class by class name and fields



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+
+/**
+ * CDC response message generator.
+ */
+public final class CDCResponseGenerator {
+    
+    /**
+     * Succeed response builder.
+     *
+     * @param requestId request id
+     * @return success message

Review Comment:
   Seem `return` javadoc doesn't match method



##########
kernel/data-pipeline/cdc/protocol/pom.xml:
##########
@@ -27,4 +27,53 @@
     <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
     <name>${project.artifactId}</name>
     
+    <properties>
+        <protobuf-java-util.version>3.17.1</protobuf-java-util.version>
+        <protobuf-java.version>3.17.1</protobuf-java.version>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf-java.version}</version>

Review Comment:
   dependency could be put in `dependencyManagement` of root pom.xml, and `<version>` could be removed here



##########
kernel/data-pipeline/cdc/protocol/pom.xml:
##########
@@ -27,4 +27,53 @@
     <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
     <name>${project.artifactId}</name>
     
+    <properties>
+        <protobuf-java-util.version>3.17.1</protobuf-java-util.version>
+        <protobuf-java.version>3.17.1</protobuf-java.version>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf-java.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf-java.version}</version>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.6.1</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <additionalProtoPathElements>
+                        <additionalProtoPathElement>${project.basedir}/src/main/resources</additionalProtoPathElement>
+                    </additionalProtoPathElements>
+                    <protocArtifact>com.google.protobuf:protoc:3.17.1:exe:${os.detected.classifier}</protocArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.6.0</version>

Review Comment:
   1, Could we put `kr.motd.maven` related link in PR, and also license.
   
   2, `<version>` could be put in root pom.xml



##########
kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.common;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * CDC response error code.
+ */
+@RequiredArgsConstructor
+public enum CDCResponseErrorCode {
+    
+    ILLEGAL_REQUEST_ERROR("-1"),
+    

Review Comment:
   Could we use positive integer code, just like SERVER_ERROR



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";
+
+message CDCRequest {
+  int32 version = 1; // proto version
+  string request_id = 2; // request id
+  enum Type {
+    UNKNOWN = 0;
+    LOGIN = 1;
+    CREATE_SUBSCRIPTION = 2; 
+    START_SUBSCRIPTION = 3; 
+    STOP_SUBSCRIPTION = 4; 
+    DROP_SUBSCRIPTION = 5;
+    FETCH_RECORDS = 6;
+    FETCH_RECORDS_ACK = 7; // The ack of fetch records
+  }
+  Type type = 3;
+  oneof request {
+    LoginRequest login = 4;
+    CreateSubscriptionRequest create_subscription = 5;
+    StartSubscriptionRequest start_subscription = 6;
+    FetchRecordAck fetch_record_ack = 7;
+    StopSubscriptionRequest stop_subscription = 8;
+    DropSubscriptionRequest drop_subscription = 9;
+  }
+}
+
+message LoginRequest {
+  enum LoginType {
+    UNKNOWN = 0;
+    BASIC = 1;
+  }
+  LoginType type = 1;
+  oneof body {
+    BasicBody basic_body = 2; // Username and password body
+  }
+
+  message BasicBody {
+    string username = 1;
+    string password = 2; // Encrypted password
+  }
+}
+
+message CreateSubscriptionRequest {
+  string database = 1;
+  repeated string tableNames = 2; // If a schema exists use . separates,eg. public.t_order
+  string subscriptionName = 3;
+  SubscriptionMode subscriptionMode = 4;
+
+  enum SubscriptionMode {
+    UNKNOWN = 0;
+    INCREMENTAL = 1;
+    FULL = 2;
+  }
+}
+
+message StartSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message StopSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message DropSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message FetchRecordAck {

Review Comment:
   1, Why `FetchRecordAck` has no `Request` suffix
   
   2, What's the fetched data type, record or ack?
   



##########
kernel/data-pipeline/cdc/client/pom.xml:
##########
@@ -33,5 +33,20 @@
             <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>runtime</scope>
+        </dependency>

Review Comment:
   Is logback dependency necessary to add here?



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool already_exists = 2;

Review Comment:
   Maybe `existing` is better



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest.Type;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.LoginType;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.ServerGreetingResult;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * Login request handler.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
+    
+    private static final AttributeKey<String> LOGIN_REQUEST_ID_KEY = AttributeKey.valueOf("login.request.id");
+    
+    private final String username;
+    
+    private final String password;
+    
+    private boolean loggedIn;
+    
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) {
+        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).set(null);
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        if (loggedIn) {
+            ctx.fireChannelRead(msg);
+            return;
+        }
+        CDCResponse response = (CDCResponse) msg;
+        if (response.hasServerGreetingResult()) {
+            ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
+            log.info("Server greeting result, server version: {}, min protocol version: {}", serverGreetingResult.getServerVersion(), serverGreetingResult.getMinProtocolVersion());
+            sendLoginRequest(ctx);
+            return;
+        }
+        if (Status.FAILED == response.getStatus()) {
+            log.error("login failed, {}", msg);
+            return;
+        }
+        if (Status.SUCCEED == response.getStatus() && Objects.equals(ctx.channel().attr(LOGIN_REQUEST_ID_KEY).get(), response.getRequestId())) {
+            log.info("login success, username {}", username);
+            loggedIn = true;
+            ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+        }
+    }
+    
+    private void sendLoginRequest(final ChannelHandlerContext ctx) {
+        String encryptPassword = Hashing.sha256().hashBytes(password.getBytes()).toString();
+        LoginRequest loginRequest = LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
+        String loginRequestId = UUID.randomUUID().toString();
+        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).setIfAbsent(loginRequestId);
+        CDCRequest data = CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
+        ctx.writeAndFlush(data);

Review Comment:
   `writeAndFlush` is async, if `writeAndFlush` failed, then is there exception stack trace in log?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+
+/**
+ * CDC connection context.
+ */
+@Getter
+public final class CDCConnectionContext {
+    
+    @Setter
+    private CDCConnectionStatus status;

Review Comment:
   Is it possible to make `status` field `final`?



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.StartSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * Subscription request handler.
+ */
+@Slf4j
+public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
+    
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (evt instanceof CreateSubscriptionEvent) {
+            Builder builder = CDCRequest.newBuilder();
+            builder.setCreateSubscription(buildCreateSubscriptionRequest());
+            builder.setRequestId(UUID.randomUUID().toString());

Review Comment:
   Could we add requestId generator? There're much hard-coded code for now



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.config;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.rule.scope.GlobalRuleConfiguration;
+
+/**
+ * CDC rule configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRuleConfiguration implements GlobalRuleConfiguration {
+    
+    private final boolean enable;

Review Comment:
   `enable` should be `enabled`, the same as rule configuration



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.StartSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+/**
+ * Subscription request handler.
+ */
+@Slf4j
+public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
+    
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (evt instanceof CreateSubscriptionEvent) {
+            Builder builder = CDCRequest.newBuilder();
+            builder.setCreateSubscription(buildCreateSubscriptionRequest());
+            builder.setRequestId(UUID.randomUUID().toString());
+            ctx.writeAndFlush(builder.build());
+        }
+    }
+    
+    private CreateSubscriptionRequest buildCreateSubscriptionRequest() {
+        return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db")
+                .addAllTableNames(Arrays.asList("t_order", "t_order_item")).build();

Review Comment:
   `sharding_db` and `t_order` could not be hard-coded here, maybe add TODO for now?



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+
+/**
+ * CDC client.
+ */
+@Slf4j
+public final class CDCClient {
+    
+    /**
+     * Start ShardingSphere CDC client.
+     *
+     * @param port port
+     * @param address addresses
+     */
+    @SneakyThrows(InterruptedException.class)
+    public void start(final int port, final String address) {
+        startInternal(port, address);
+    }
+    
+    private void startInternal(final int port, final String address) throws InterruptedException {
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.channel(NioSocketChannel.class)
+                .group(new NioEventLoopGroup())
+                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .handler(new ChannelInitializer<NioSocketChannel>() {
+                    
+                    @Override
+                    protected void initChannel(final NioSocketChannel channel) {
+                        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+                        channel.pipeline().addLast(new ProtobufDecoder(CDCResponse.getDefaultInstance()));
+                        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+                        channel.pipeline().addLast(new ProtobufEncoder());
+                        channel.pipeline().addLast(new LoginRequestHandler("root", "root"));

Review Comment:
   CDC server account could not be hard-coded here, maybe add TODO here?



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;

Review Comment:
   1, Is `server_version` required?
   
   2, Why there is `max_protocol_version` and `min_protocol_version`? Is there current protocol version?
   



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool already_exists = 2;
+}
+
+message FetchRecordResult {
+  message Record {
+    map<string, google.protobuf.Any> before = 1;
+    map<string, google.protobuf.Any> after = 2;
+    message TableMetaData {
+      string databaseName = 1;
+      optional string schema = 2;
+      string tableName = 3;
+    }
+    TableMetaData tableMetaData = 3;
+    int64 ts_ms = 4;  // Transaction Commit Time
+    enum DataChangeType {
+      UNKNOWN = 0;
+      INSERT = 1;
+      UPDATE = 2;
+      DELETE = 3;
+      CREATE = 4;
+      ALTER = 5;
+      DROP = 6;

Review Comment:
   CREATE/ALTER/DROP table or index?



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereCDCServer.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ShardingSphere CDC server.
+ */
+@Slf4j
+public final class ShardingSphereCDCServer {
+    

Review Comment:
   Could be remove the `ShardingSphere` prefix? Just like CDCClient



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCdcRuleConfiguration.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+
+/**
+ * CDC configuration for YAML.
+ */
+@Getter
+@Setter
+public final class YamlCdcRuleConfiguration implements YamlRuleConfiguration {
+    
+    private boolean enable;

Review Comment:
   1, `Cdc` in class name could be `CDC`?
   
   2, Rename `enable`



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.ServerGreetingResult;
+import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+@Slf4j
+public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
+    
+    private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
+    
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) {
+        CDCConnectionContext context = new CDCConnectionContext();
+        context.setStatus(CDCConnectionStatus.NOT_LOGGED_IN);
+        ctx.channel().attr(CONNECTION_CONTEXT_KEY).setIfAbsent(context);
+        Builder builder = CDCResponse.newBuilder();
+        builder.setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setMaxProtocolVersion("1").setMinProtocolVersion("1").build());
+        ctx.writeAndFlush(builder.build());
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+        CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        CDCConnectionStatus status = connectionContext.getStatus();
+        CDCRequest request = (CDCRequest) msg;
+        if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
+            processLogin(ctx, request);
+            return;
+        }
+        switch (request.getRequestCase()) {
+            case CREATE_SUBSCRIPTION:
+                processCreateSubscription(ctx, request);
+                break;
+            case START_SUBSCRIPTION:
+                processStartSubscription(ctx, request, connectionContext);
+                break;
+            case STOP_SUBSCRIPTION:
+                stopStartSubscription(ctx, request, connectionContext);
+                break;
+            case DROP_SUBSCRIPTION:
+                dropStartSubscription(ctx, request);
+                break;
+            case FETCH_RECORD_ACK:
+                break;
+            default:
+                log.warn("Cannot handle this type of request {}", request);
+        }
+    }
+    
+    private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
+        if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        BasicBody body = request.getLogin().getBasicBody();
+        Collection<ShardingSphereRule> globalRules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
+        Optional<AuthorityRule> authorityRule = globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule -> (AuthorityRule) rule).findFirst();
+        if (!authorityRule.isPresent()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find authority rule")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        Optional<ShardingSphereUser> user = authorityRule.get().findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
+        if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString(), body.getPassword())) {

Review Comment:
   Is sha256 result case-sensitive? It's better to uppercase or lowercase it, and also in CDC client



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.ServerGreetingResult;
+import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+@Slf4j
+public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
+    
+    private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
+    
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) {
+        CDCConnectionContext context = new CDCConnectionContext();
+        context.setStatus(CDCConnectionStatus.NOT_LOGGED_IN);
+        ctx.channel().attr(CONNECTION_CONTEXT_KEY).setIfAbsent(context);
+        Builder builder = CDCResponse.newBuilder();
+        builder.setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setMaxProtocolVersion("1").setMinProtocolVersion("1").build());
+        ctx.writeAndFlush(builder.build());
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+        CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        CDCConnectionStatus status = connectionContext.getStatus();
+        CDCRequest request = (CDCRequest) msg;
+        if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
+            processLogin(ctx, request);
+            return;
+        }
+        switch (request.getRequestCase()) {
+            case CREATE_SUBSCRIPTION:
+                processCreateSubscription(ctx, request);
+                break;
+            case START_SUBSCRIPTION:
+                processStartSubscription(ctx, request, connectionContext);
+                break;
+            case STOP_SUBSCRIPTION:
+                stopStartSubscription(ctx, request, connectionContext);
+                break;
+            case DROP_SUBSCRIPTION:
+                dropStartSubscription(ctx, request);
+                break;
+            case FETCH_RECORD_ACK:
+                break;
+            default:
+                log.warn("Cannot handle this type of request {}", request);
+        }
+    }
+    
+    private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
+        if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        BasicBody body = request.getLogin().getBasicBody();
+        Collection<ShardingSphereRule> globalRules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
+        Optional<AuthorityRule> authorityRule = globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule -> (AuthorityRule) rule).findFirst();
+        if (!authorityRule.isPresent()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find authority rule")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        Optional<ShardingSphereUser> user = authorityRule.get().findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
+        if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString(), body.getPassword())) {
+            CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+            connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+            ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(connectionContext);

Review Comment:
   Looks CONNECTION_CONTEXT_KEY already exists in ctx.channel(), why set it again



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
+
+/**
+ * CDC rule.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRule implements GlobalRule {
+    
+    private final boolean enable;

Review Comment:
   `enable` should be `enabled`



##########
kernel/data-pipeline/cdc/protocol/pom.xml:
##########
@@ -27,4 +27,53 @@
     <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
     <name>${project.artifactId}</name>
     
+    <properties>
+        <protobuf-java-util.version>3.17.1</protobuf-java-util.version>
+        <protobuf-java.version>3.17.1</protobuf-java.version>
+    </properties>

Review Comment:
   1, properties should be put in root pom.xml
   
   2, Could we merge these two versions?
   



##########
proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCChannelInboundHandlerTest {
+    
+    private static MockedStatic<ProxyContext> proxyContext;
+    
+    private final CDCChannelInboundHandler cdcChannelInboundHandler = new CDCChannelInboundHandler();
+    
+    private EmbeddedChannel channel;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        proxyContext = mockStatic(ProxyContext.class);
+        ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
+        proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
+        ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
+        when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
+        List<ShardingSphereRule> rules = Collections.singletonList(mockAuthRule());
+        when(globalRuleMetaData.getRules()).thenReturn(rules);
+    }
+    
+    private static AuthorityRule mockAuthRule() {
+        AuthorityRule result = mock(AuthorityRule.class);
+        ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
+        when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
+        when(mockUser.getPassword()).thenReturn("root");
+        when(result.findUser(any())).thenReturn(Optional.of(mockUser));
+        return result;
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        proxyContext.close();
+    }
+    
+    @Before
+    public void setup() {
+        channel = new EmbeddedChannel(new LoggingHandler(), cdcChannelInboundHandler);
+    }
+    
+    @Test
+    public void assertLoginRequestFailed() {
+        CDCRequest actualRequest = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+        assertThat(expectedLoginResult.getErrorCode(), is("1"));

Review Comment:
   `1` could be replaced to enum code reference



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+
+/**
+ * CDC response message generator.
+ */
+public final class CDCResponseGenerator {
+    
+    /**
+     * Succeed response builder.
+     *
+     * @param requestId request id
+     * @return success message
+     */
+    public static Builder succeedBuilder(final String requestId) {
+        Builder builder = CDCResponse.newBuilder();
+        builder.setStatus(Status.SUCCEED);
+        builder.setRequestId(requestId);
+        return builder;
+    }
+    
+    /**
+     * Failed response.
+     *
+     * @param requestId request id
+     * @param errorCode error code
+     * @param errorMessage error message
+     * @return failed response
+     */
+    public static CDCResponse failed(final String requestId, final CDCResponseErrorCode errorCode, final String errorMessage) {
+        Builder builder = CDCResponse.newBuilder();
+        builder.setStatus(Status.FAILED);
+        builder.setRequestId(requestId);
+        builder.setErrorCode(errorCode.getCode());
+        builder.setErrorMessage(errorMessage);
+        return builder.build();

Review Comment:
   Might it be shorter?



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";

Review Comment:
   And also class name `CDCRequestProto`



##########
proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Status;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCChannelInboundHandlerTest {
+    
+    private static MockedStatic<ProxyContext> proxyContext;
+    
+    private final CDCChannelInboundHandler cdcChannelInboundHandler = new CDCChannelInboundHandler();
+    
+    private EmbeddedChannel channel;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        proxyContext = mockStatic(ProxyContext.class);
+        ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
+        proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
+        ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
+        when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
+        List<ShardingSphereRule> rules = Collections.singletonList(mockAuthRule());
+        when(globalRuleMetaData.getRules()).thenReturn(rules);
+    }
+    
+    private static AuthorityRule mockAuthRule() {
+        AuthorityRule result = mock(AuthorityRule.class);
+        ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
+        when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
+        when(mockUser.getPassword()).thenReturn("root");
+        when(result.findUser(any())).thenReturn(Optional.of(mockUser));
+        return result;
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        proxyContext.close();
+    }
+    
+    @Before
+    public void setup() {
+        channel = new EmbeddedChannel(new LoggingHandler(), cdcChannelInboundHandler);
+    }
+    
+    @Test
+    public void assertLoginRequestFailed() {
+        CDCRequest actualRequest = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+        assertThat(expectedLoginResult.getErrorCode(), is("1"));
+        assertFalse(channel.isOpen());
+    }
+    
+    @Test
+    public void assertIllegalLoginRequest() {
+        CDCRequest actualRequest = CDCRequest.newBuilder().setVersion(1).setRequestId("test").build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+        assertThat(expectedLoginResult.getErrorCode(), is("-1"));

Review Comment:
   `-1` could be replaced to enum code reference



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCdcRuleConfigurationSwapper.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCOrder;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCdcRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper;
+
+/**
+ * YAML CDC rule configuration swapper.
+ */
+public final class YamlCdcRuleConfigurationSwapper implements YamlRuleConfigurationSwapper<YamlCdcRuleConfiguration, CDCRuleConfiguration> {

Review Comment:
   `Cdc` in class name could be `CDC`? Keep it consistent with configuration class



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";
+
+message CDCRequest {
+  int32 version = 1; // proto version
+  string request_id = 2; // request id
+  enum Type {
+    UNKNOWN = 0;
+    LOGIN = 1;
+    CREATE_SUBSCRIPTION = 2; 
+    START_SUBSCRIPTION = 3; 
+    STOP_SUBSCRIPTION = 4; 
+    DROP_SUBSCRIPTION = 5;
+    FETCH_RECORDS = 6;
+    FETCH_RECORDS_ACK = 7; // The ack of fetch records
+  }
+  Type type = 3;
+  oneof request {
+    LoginRequest login = 4;
+    CreateSubscriptionRequest create_subscription = 5;
+    StartSubscriptionRequest start_subscription = 6;
+    FetchRecordAck fetch_record_ack = 7;
+    StopSubscriptionRequest stop_subscription = 8;
+    DropSubscriptionRequest drop_subscription = 9;
+  }
+}
+
+message LoginRequest {
+  enum LoginType {
+    UNKNOWN = 0;
+    BASIC = 1;
+  }
+  LoginType type = 1;
+  oneof body {
+    BasicBody basic_body = 2; // Username and password body
+  }
+
+  message BasicBody {
+    string username = 1;
+    string password = 2; // Encrypted password
+  }
+}
+
+message CreateSubscriptionRequest {
+  string database = 1;
+  repeated string tableNames = 2; // If a schema exists use . separates,eg. public.t_order
+  string subscriptionName = 3;
+  SubscriptionMode subscriptionMode = 4;
+
+  enum SubscriptionMode {
+    UNKNOWN = 0;
+    INCREMENTAL = 1;
+    FULL = 2;
+  }
+}
+
+message StartSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message StopSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message DropSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message FetchRecordAck {
+  string subscriptionName = 1;
+  string lastReceivePosition = 2;

Review Comment:
   `lastReceivePosition` could be `lastReceivedPosition`



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";

Review Comment:
   `proto` could be `protocol`, there's no `proto` as package name in current code



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";
+
+message CDCRequest {
+  int32 version = 1; // proto version
+  string request_id = 2; // request id
+  enum Type {
+    UNKNOWN = 0;
+    LOGIN = 1;
+    CREATE_SUBSCRIPTION = 2; 
+    START_SUBSCRIPTION = 3; 
+    STOP_SUBSCRIPTION = 4; 
+    DROP_SUBSCRIPTION = 5;
+    FETCH_RECORDS = 6;
+    FETCH_RECORDS_ACK = 7; // The ack of fetch records
+  }
+  Type type = 3;
+  oneof request {
+    LoginRequest login = 4;
+    CreateSubscriptionRequest create_subscription = 5;
+    StartSubscriptionRequest start_subscription = 6;
+    FetchRecordAck fetch_record_ack = 7;
+    StopSubscriptionRequest stop_subscription = 8;
+    DropSubscriptionRequest drop_subscription = 9;
+  }
+}
+
+message LoginRequest {
+  enum LoginType {
+    UNKNOWN = 0;
+    BASIC = 1;
+  }
+  LoginType type = 1;
+  oneof body {
+    BasicBody basic_body = 2; // Username and password body
+  }
+
+  message BasicBody {
+    string username = 1;
+    string password = 2; // Encrypted password
+  }
+}
+
+message CreateSubscriptionRequest {
+  string database = 1;
+  repeated string tableNames = 2; // If a schema exists use . separates,eg. public.t_order

Review Comment:
   1, Could we add schema in `tableNames` field?
   
   2, `,` is invalid, it should be ASCII character
   



##########
kernel/data-pipeline/cdc/protocol/pom.xml:
##########
@@ -27,4 +27,53 @@
     <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
     <name>${project.artifactId}</name>
     
+    <properties>
+        <protobuf-java-util.version>3.17.1</protobuf-java-util.version>
+        <protobuf-java.version>3.17.1</protobuf-java.version>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf-java.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+            <version>${protobuf-java.version}</version>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>0.6.1</version>

Review Comment:
   1, Could be add `org.xolstice.maven.plugins` related link in PR, and also the license.
   
   2, `<version>` could be put in root pom.xml
   



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool already_exists = 2;
+}
+
+message FetchRecordResult {
+  message Record {
+    map<string, google.protobuf.Any> before = 1;
+    map<string, google.protobuf.Any> after = 2;
+    message TableMetaData {
+      string databaseName = 1;
+      optional string schema = 2;
+      string tableName = 3;
+    }
+    TableMetaData tableMetaData = 3;
+    int64 ts_ms = 4;  // Transaction Commit Time

Review Comment:
   `ts_ms` should be more meaningful



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;

Review Comment:
   Why it starts from `100`, could they use `14` and `15`?



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool already_exists = 2;
+}
+
+message FetchRecordResult {
+  message Record {
+    map<string, google.protobuf.Any> before = 1;
+    map<string, google.protobuf.Any> after = 2;
+    message TableMetaData {
+      string databaseName = 1;

Review Comment:
   It's `database` in `CreateSubscriptionRequest`, but `databaseName` here, could they keep consistent?



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProto.proto:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.request";
+
+message CDCRequest {
+  int32 version = 1; // proto version
+  string request_id = 2; // request id
+  enum Type {
+    UNKNOWN = 0;
+    LOGIN = 1;
+    CREATE_SUBSCRIPTION = 2; 
+    START_SUBSCRIPTION = 3; 
+    STOP_SUBSCRIPTION = 4; 
+    DROP_SUBSCRIPTION = 5;
+    FETCH_RECORDS = 6;
+    FETCH_RECORDS_ACK = 7; // The ack of fetch records

Review Comment:
   Comments could be removed, if field name is not clear enough, then we could improve the field name



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereCDCServer.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ShardingSphere CDC server.
+ */
+@Slf4j
+public final class ShardingSphereCDCServer {
+    
+    private EventLoopGroup bossGroup;
+    
+    private EventLoopGroup workerGroup;
+    
+    /**
+     * Start ShardingSphere CDC server.
+     *
+     * @param port port
+     * @param addresses addresses
+     * @return futures
+     */
+    @SneakyThrows(InterruptedException.class)
+    public List<ChannelFuture> startAsync(final int port, final List<String> addresses) {
+        return startInternal(port, addresses);
+    }
+    
+    private List<ChannelFuture> startInternal(final int port, final List<String> addresses) throws InterruptedException {

Review Comment:
   `port` parameter could be put after `addresses`



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";

Review Comment:
   `proto` could be improved



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java:
##########
@@ -62,6 +66,12 @@ public final class ShardingSphereProxy {
     public void start(final int port, final List<String> addresses) {
         try {
             List<ChannelFuture> futures = startInternal(port, addresses);
+            Optional<CDCRule> cdcConfig = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().stream()
+                    .filter(each -> each instanceof CDCRule).map(each -> (CDCRule) each).findFirst();
+            // TODO cdc is actually independent, but coupled with the proxy code
+            if (cdcConfig.isPresent() && cdcConfig.get().isEnable()) {
+                futures.addAll(new ShardingSphereCDCServer().startAsync(cdcConfig.get().getPort(), addresses));
+            }

Review Comment:
   1, It should not be added here, since `CDCServer` is similar as ` ShardingSphereProxy`
   
   2, When will CDCServer's `bossGroup` and `workerGroup` shutdown
   



##########
proxy/bootstrap/src/main/resources/conf/server.yaml:
##########
@@ -42,6 +42,10 @@
 #  privilege:
 #    type: ALL_PERMITTED
 #
+#cdc:
+#  enable: false

Review Comment:
   1, `enable` should be `enabled`
   
   2, CDC configuration could be put after `sqlParser`
   



##########
kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProto.proto:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProto";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.proto.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 100;
+  optional string error_message = 101;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string max_protocol_version = 2;
+  string min_protocol_version = 3;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool already_exists = 2;
+}
+
+message FetchRecordResult {
+  message Record {
+    map<string, google.protobuf.Any> before = 1;
+    map<string, google.protobuf.Any> after = 2;
+    message TableMetaData {
+      string databaseName = 1;
+      optional string schema = 2;
+      string tableName = 3;
+    }
+    TableMetaData tableMetaData = 3;
+    int64 ts_ms = 4;  // Transaction Commit Time
+    enum DataChangeType {
+      UNKNOWN = 0;
+      INSERT = 1;
+      UPDATE = 2;
+      DELETE = 3;
+      CREATE = 4;
+      ALTER = 5;
+      DROP = 6;
+    }
+    bool isDdl = 6; // When isDdl is true, the message contains a DDL sql.
+    optional string sql = 7; // This field has a value only when isDdl is true

Review Comment:
   Consider 1) `isDdl` to `isDDL`, 2) `sql` to `ddlSQL`



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/ShardingSphereProxy.java:
##########
@@ -41,11 +43,13 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * ShardingSphere-Proxy.
  */
 @Slf4j
+@RequiredArgsConstructor

Review Comment:
   Is RequiredArgsConstructor required?



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java:
##########
@@ -51,6 +51,7 @@ public void assertLoadEmptyConfiguration() throws IOException {
         assertNull(serverConfig.getLabels());
         assertTrue(serverConfig.getProps().isEmpty());
         assertTrue(serverConfig.getRules().isEmpty());
+        assertNull(serverConfig.getCdc());

Review Comment:
   It's better put after `getAuthority`



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.cdc.proto.response.ServerGreetingResult;
+import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+@Slf4j
+public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {

Review Comment:
   Class javadoc is required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org