You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/05 06:35:27 UTC

[shardingsphere] branch master updated: Add CDC rule and login implemention (#22579)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 25165109c17 Add CDC rule and login implemention (#22579)
25165109c17 is described below

commit 25165109c1771f52eeaae715ee4c5f49ba620f47
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Dec 5 14:35:18 2022 +0800

    Add CDC rule and login implemention (#22579)
    
    * Add cdc protocol
    
    * Add cdc rule and connection context
    
    * Add CDC server and handler, update server.yaml config
    
    * Add CDC client login implementation
    
    * Fix codestyle
    
    * Improve code
    
    * Add volatile
    
    * Remove logback dependency
---
 kernel/data-pipeline/cdc/client/pom.xml            |  10 ++
 .../data/pipeline/cdc/client/CDCClient.java        |  76 +++++++++++
 .../cdc/client/event/CreateSubscriptionEvent.java  |  24 ++++
 .../cdc/client/handler/LoginRequestHandler.java    |  96 +++++++++++++
 .../client/handler/SubscriptionRequestHandler.java |  84 ++++++++++++
 .../pipeline/cdc/client/util/RequestIdUtil.java    |  39 ++++++
 .../pipeline/cdc/config/CDCRuleConfiguration.java  |  34 +++++
 .../pipeline/cdc/constant/CDCConnectionStatus.java |  30 +++++
 .../data/pipeline/cdc/constant/CDCOrder.java       |  26 ++++
 .../pipeline/cdc/context/CDCConnectionContext.java |  32 +++++
 .../cdc/generator/CDCResponseGenerator.java        |  51 +++++++
 .../data/pipeline/cdc/rule/CDCRule.java            |  46 +++++++
 .../pipeline/cdc/rule/builder/CDCRuleBuilder.java  |  50 +++++++
 .../cdc/yaml/config/YamlCDCRuleConfiguration.java  |  41 ++++++
 .../swapper/YamlCDCRuleConfigurationSwapper.java   |  57 ++++++++
 ...ere.infra.rule.builder.global.GlobalRuleBuilder |  18 +++
 ...onfig.swapper.rule.YamlRuleConfigurationSwapper |  18 +++
 kernel/data-pipeline/cdc/protocol/pom.xml          |  41 ++++++
 .../pipeline/cdc/common/CDCResponseErrorCode.java  |  35 +++++
 .../src/main/proto/CDCRequestProtocol.proto        |  96 +++++++++++++
 .../src/main/proto/CDCResponseProtocol.proto       |  82 ++++++++++++
 .../cdc/protocol/src/main/resources/.gitkeep       |  18 +++
 pom.xml                                            |  16 ++-
 proxy/backend/pom.xml                              |   5 +
 .../backend/config/ProxyConfigurationLoader.java   |   3 +
 .../config/yaml/YamlProxyServerConfiguration.java  |   3 +
 .../config/ProxyConfigurationLoaderTest.java       |   1 +
 .../org/apache/shardingsphere/proxy/Bootstrap.java |   5 +
 .../bootstrap/src/main/resources/conf/server.yaml  |   4 +
 .../shardingsphere/proxy/frontend/CDCServer.java   | 101 ++++++++++++++
 .../frontend/netty/CDCChannelInboundHandler.java   | 148 +++++++++++++++++++++
 .../netty/CDCServerHandlerInitializer.java         |  41 ++++++
 .../netty/CDCChannelInboundHandlerTest.java        | 131 ++++++++++++++++++
 .../netty/CDCServerHandlerInitializerTest.java     |  48 +++++++
 34 files changed, 1509 insertions(+), 1 deletion(-)

diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 2da0211d90b..4277e618b8d 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -33,5 +33,15 @@
             <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
new file mode 100644
index 00000000000..03c9a510a0c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+
+/**
+ * CDC client.
+ */
+@Slf4j
+public final class CDCClient {
+    
+    /**
+     * Start ShardingSphere CDC client.
+     *
+     * @param port port
+     * @param address addresses
+     */
+    @SneakyThrows(InterruptedException.class)
+    public void start(final String address, final int port) {
+        startInternal(address, port);
+    }
+    
+    private void startInternal(final String address, final int port) throws InterruptedException {
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.channel(NioSocketChannel.class)
+                .group(new NioEventLoopGroup())
+                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .handler(new ChannelInitializer<NioSocketChannel>() {
+                    
+                    @Override
+                    protected void initChannel(final NioSocketChannel channel) {
+                        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+                        channel.pipeline().addLast(new ProtobufDecoder(CDCResponse.getDefaultInstance()));
+                        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+                        channel.pipeline().addLast(new ProtobufEncoder());
+                        // TODO username and password are read from the configuration file or args
+                        channel.pipeline().addLast(new LoginRequestHandler("root", "root"));
+                        channel.pipeline().addLast(new SubscriptionRequestHandler());
+                    }
+                });
+        ChannelFuture future = bootstrap.connect(address, port).sync();
+        future.channel().closeFuture().sync();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
new file mode 100644
index 00000000000..59bb67e898c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.event;
+
+/**
+ * Create subscription event.
+ */
+public final class CreateSubscriptionEvent {
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
new file mode 100644
index 00000000000..435862b143a
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.LoginType;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+
+import java.util.Objects;
+
+/**
+ * Login request handler.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
+    
+    private static final AttributeKey<String> LOGIN_REQUEST_ID_KEY = AttributeKey.valueOf("login.request.id");
+    
+    private final String username;
+    
+    private final String password;
+    
+    private boolean loggedIn;
+    
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) {
+        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).set(null);
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        if (loggedIn) {
+            ctx.fireChannelRead(msg);
+            return;
+        }
+        CDCResponse response = (CDCResponse) msg;
+        if (response.hasServerGreetingResult()) {
+            ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
+            log.info("Server greeting result, server version: {}, min protocol version: {}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
+            sendLoginRequest(ctx);
+            return;
+        }
+        if (Status.FAILED == response.getStatus()) {
+            log.error("login failed, {}", msg);
+            return;
+        }
+        if (Status.SUCCEED == response.getStatus() && Objects.equals(ctx.channel().attr(LOGIN_REQUEST_ID_KEY).get(), response.getRequestId())) {
+            log.info("login success, username {}", username);
+            loggedIn = true;
+            ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+        }
+    }
+    
+    private void sendLoginRequest(final ChannelHandlerContext ctx) {
+        String encryptPassword = Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
+        LoginRequest loginRequest = LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
+        String loginRequestId = RequestIdUtil.generateRequestId();
+        ctx.channel().attr(LOGIN_REQUEST_ID_KEY).setIfAbsent(loginRequestId);
+        CDCRequest data = CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
+        ctx.writeAndFlush(data);
+    }
+    
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        log.error("login handler error", cause);
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
new file mode 100644
index 00000000000..3fa4cb4bb81
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+/**
+ * Subscription request handler.
+ */
+@Slf4j
+public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
+    
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (evt instanceof CreateSubscriptionEvent) {
+            CDCRequest request = CDCRequest.newBuilder().setCreateSubscription(buildCreateSubscriptionRequest()).setRequestId(RequestIdUtil.generateRequestId()).build();
+            ctx.writeAndFlush(request);
+        }
+    }
+    
+    private CreateSubscriptionRequest buildCreateSubscriptionRequest() {
+        // TODO the parameter shouldn't hard code, will be fixed when completed
+        TableName tableName = TableName.newBuilder().build();
+        return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db")
+                .addTableNames(tableName).build();
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        CDCResponse response = (CDCResponse) msg;
+        if (Status.SUCCEED == response.getStatus()) {
+            processSucceed(ctx, response);
+        } else {
+            log.error("subscription response error {}", msg);
+        }
+    }
+    
+    private void processSucceed(final ChannelHandlerContext ctx, final CDCResponse response) {
+        if (response.hasCreateSubscriptionResult()) {
+            log.info("create subscription succeed, subcrption name {}", response.getCreateSubscriptionResult().getSubscriptionName());
+            Builder builder = CDCRequest.newBuilder();
+            builder.setStartSubscription(buildStartSubscriptionRequest(response.getCreateSubscriptionResult().getSubscriptionName()));
+            builder.setRequestId(RequestIdUtil.generateRequestId());
+            ctx.writeAndFlush(builder.build());
+        }
+        // TODO waiting for pipeline refactoring finished
+    }
+    
+    private StartSubscriptionRequest buildStartSubscriptionRequest(final String subscriptionName) {
+        return StartSubscriptionRequest.newBuilder().setSubscriptionName(subscriptionName).build();
+    }
+    
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        log.error("subscription handler error", cause);
+    }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java
new file mode 100644
index 00000000000..0c7f0a94dfa
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.UUID;
+
+/**
+ * Request id util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RequestIdUtil {
+    
+    /**
+     * Generate request id.
+     *
+     * @return request id.
+     */
+    public static String generateRequestId() {
+        return UUID.randomUUID().toString();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java
new file mode 100644
index 00000000000..52c63af066e
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.config;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.rule.scope.GlobalRuleConfiguration;
+
+/**
+ * CDC rule configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRuleConfiguration implements GlobalRuleConfiguration {
+    
+    private final boolean enabled;
+    
+    private final int port;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
new file mode 100644
index 00000000000..09ce0e484db
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC connection status.
+ */
+public enum CDCConnectionStatus {
+    
+    NOT_LOGGED_IN,
+    
+    LOGGED_IN,
+    
+    SUBSCRIBED
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java
new file mode 100644
index 00000000000..7fc35073699
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC rule order of load.
+ */
+public final class CDCOrder {
+    
+    public static final int ORDER = 10000;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
new file mode 100644
index 00000000000..2b74ddfd816
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+
+/**
+ * CDC connection context.
+ */
+@Getter
+public final class CDCConnectionContext {
+    
+    @Setter
+    private volatile CDCConnectionStatus status;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
new file mode 100644
index 00000000000..dc24261e26b
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+/**
+ * CDC response message generator.
+ */
+public final class CDCResponseGenerator {
+    
+    /**
+     * Succeed response builder.
+     *
+     * @param requestId request id
+     * @return succeed response builder
+     */
+    public static Builder succeedBuilder(final String requestId) {
+        return CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
+    }
+    
+    /**
+     * Failed response.
+     *
+     * @param requestId request id
+     * @param errorCode error code
+     * @param errorMessage error message
+     * @return failed response
+     */
+    public static CDCResponse failed(final String requestId, final CDCResponseErrorCode errorCode, final String errorMessage) {
+        return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode.getCode()).setErrorMessage(errorMessage).build();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java
new file mode 100644
index 00000000000..2f2b21df21f
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
+
+/**
+ * CDC rule.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRule implements GlobalRule {
+    
+    private final boolean enable;
+    
+    private final int port;
+    
+    @Override
+    public RuleConfiguration getConfiguration() {
+        return new CDCRuleConfiguration(enable, port);
+    }
+    
+    @Override
+    public String getType() {
+        return CDCRule.class.getSimpleName();
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java
new file mode 100644
index 00000000000..d4f38ea5b48
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.rule.builder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCOrder;
+import org.apache.shardingsphere.data.pipeline.cdc.rule.CDCRule;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder;
+
+import java.util.Map;
+
+/**
+ * CDC rule builder.
+ */
+public final class CDCRuleBuilder implements GlobalRuleBuilder<CDCRuleConfiguration> {
+    
+    @Override
+    public CDCRule build(final CDCRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext,
+                         final ConfigurationProperties props) {
+        return new CDCRule(ruleConfig.isEnabled(), ruleConfig.getPort());
+    }
+    
+    @Override
+    public int getOrder() {
+        return CDCOrder.ORDER;
+    }
+    
+    @Override
+    public Class<CDCRuleConfiguration> getTypeClass() {
+        return CDCRuleConfiguration.class;
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java
new file mode 100644
index 00000000000..7768ee61a98
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+
+/**
+ * CDC configuration for YAML.
+ */
+@Getter
+@Setter
+public final class YamlCDCRuleConfiguration implements YamlRuleConfiguration {
+    
+    private boolean enabled;
+    
+    private int port = 33071;
+    
+    @Override
+    public Class<? extends RuleConfiguration> getRuleConfigurationType() {
+        return CDCRuleConfiguration.class;
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java
new file mode 100644
index 00000000000..7ef181271bf
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCOrder;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper;
+
+/**
+ * YAML CDC rule configuration swapper.
+ */
+public final class YamlCDCRuleConfigurationSwapper implements YamlRuleConfigurationSwapper<YamlCDCRuleConfiguration, CDCRuleConfiguration> {
+    
+    @Override
+    public YamlCDCRuleConfiguration swapToYamlConfiguration(final CDCRuleConfiguration data) {
+        YamlCDCRuleConfiguration result = new YamlCDCRuleConfiguration();
+        result.setPort(null == data ? 33071 : data.getPort());
+        result.setEnabled(null != data && data.isEnabled());
+        return result;
+    }
+    
+    @Override
+    public CDCRuleConfiguration swapToObject(final YamlCDCRuleConfiguration yamlConfig) {
+        return new CDCRuleConfiguration(yamlConfig.isEnabled(), yamlConfig.getPort());
+    }
+    
+    @Override
+    public Class<CDCRuleConfiguration> getTypeClass() {
+        return CDCRuleConfiguration.class;
+    }
+    
+    @Override
+    public String getRuleTagName() {
+        return "CDC";
+    }
+    
+    @Override
+    public int getOrder() {
+        return CDCOrder.ORDER;
+    }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder
new file mode 100644
index 00000000000..8bf328f3517
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.rule.builder.CDCRuleBuilder
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper
new file mode 100644
index 00000000000..526ecb96c8c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCRuleConfigurationSwapper
diff --git a/kernel/data-pipeline/cdc/protocol/pom.xml b/kernel/data-pipeline/cdc/protocol/pom.xml
index 58d17e6e585..7460d31d940 100644
--- a/kernel/data-pipeline/cdc/protocol/pom.xml
+++ b/kernel/data-pipeline/cdc/protocol/pom.xml
@@ -27,4 +27,45 @@
     <artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
     <name>${project.artifactId}</name>
     
+    <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <additionalProtoPathElements>
+                        <additionalProtoPathElement>${project.basedir}/src/main/resources</additionalProtoPathElement>
+                    </additionalProtoPathElements>
+                    <protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+            </extension>
+        </extensions>
+    </build>
 </project>
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
new file mode 100644
index 00000000000..45d57900720
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.common;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * CDC response error code.
+ */
+@RequiredArgsConstructor
+public enum CDCResponseErrorCode {
+    
+    SERVER_ERROR("1"),
+    
+    ILLEGAL_REQUEST_ERROR("2");
+    
+    @Getter
+    private final String code;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
new file mode 100644
index 00000000000..b26bcc9dd52
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProtocol";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.protocol.request";
+
+message CDCRequest {
+  int32 version = 1;
+  string request_id = 2;
+  enum Type {
+    UNKNOWN = 0;
+    LOGIN = 1;
+    CREATE_SUBSCRIPTION = 2;
+    START_SUBSCRIPTION = 3;
+    STOP_SUBSCRIPTION = 4;
+    DROP_SUBSCRIPTION = 5;
+    FETCH_RECORDS = 6;
+    FETCH_RECORDS_ACK = 7;
+  }
+  Type type = 3;
+  oneof request {
+    LoginRequest login = 4;
+    CreateSubscriptionRequest create_subscription = 5;
+    StartSubscriptionRequest start_subscription = 6;
+    AckRequest ack_request = 7;
+    StopSubscriptionRequest stop_subscription = 8;
+    DropSubscriptionRequest drop_subscription = 9;
+  }
+}
+
+message LoginRequest {
+  enum LoginType {
+    UNKNOWN = 0;
+    BASIC = 1;
+  }
+  LoginType type = 1;
+  oneof body {
+    BasicBody basic_body = 2;
+  }
+
+  message BasicBody {
+    string username = 1;
+    string password = 2;
+  }
+}
+
+message CreateSubscriptionRequest {
+  string database = 1;
+  message TableName {
+    string schema = 1;
+    string name = 2;
+  }
+  repeated TableName tableNames = 2;
+  string subscriptionName = 3;
+  SubscriptionMode subscriptionMode = 4;
+
+  enum SubscriptionMode {
+    UNKNOWN = 0;
+    INCREMENTAL = 1;
+    FULL = 2;
+  }
+}
+
+message StartSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message StopSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message DropSubscriptionRequest {
+  string subscriptionName = 1;
+}
+
+message AckRequest {
+  string subscriptionName = 1;
+  string lastReceivedPosition = 2;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
new file mode 100644
index 00000000000..daeb01d9bf2
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProtocol";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.protocol.response";
+
+message CDCResponse {
+  string request_id = 1;
+  enum Status {
+    UNKNOWN = 0;
+    SUCCEED = 1;
+    FAILED = 2;
+  }
+  Status status = 2;
+  oneof response {
+    ServerGreetingResult server_greeting_result = 3;
+    CreateSubscriptionResult create_subscription_result = 4;
+    FetchRecordResult fetch_record_result = 5;
+  }
+
+  optional string error_code = 14;
+  optional string error_message = 15;
+}
+
+message ServerGreetingResult {
+  string server_version = 1;
+  string protocol_version = 2;
+}
+
+message CreateSubscriptionResult {
+  string subscriptionName = 1;
+  bool existing = 2;
+}
+
+message FetchRecordResult {
+  message Record {
+    map<string, google.protobuf.Any> before = 1;
+    map<string, google.protobuf.Any> after = 2;
+    message TableMetaData {
+      string database = 1;
+      optional string schema = 2;
+      string tableName = 3;
+    }
+    TableMetaData tableMetaData = 3;
+    int64 transaction_commit_millis = 4;
+    enum DataChangeType {
+      UNKNOWN = 0;
+      INSERT = 1;
+      UPDATE = 2;
+      DELETE = 3;
+      CREATE_TABLE = 4;
+      ALTER_TABLE = 5;
+      DROP_TABLE = 6;
+      CREATE_INDEX = 7;
+      ALTER_INDEX = 8;
+      DROP_INDEX = 9;
+    }
+    bool isDDL = 6;
+    optional string ddlSQL = 7;
+  }
+  repeated Record records = 1;
+  bool hasNext = 2;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep b/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep
new file mode 100644
index 00000000000..325be7c7905
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The protobuf maven plugin requires the existence of the resources directory, just keep it.
diff --git a/pom.xml b/pom.xml
index d0881f2fc14..bd930b317a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,8 @@
         <hamcrest.version>1.3</hamcrest.version>
         <mockito.version>4.8.0</mockito.version>
         
+        <protobuf-java.version>3.17.1</protobuf-java.version>
+        
         <!-- Plugin versions -->
         <apache-rat-plugin.version>0.15</apache-rat-plugin.version>
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
@@ -148,7 +150,8 @@
         <maven-pmd-plugin.version>3.5</maven-pmd-plugin.version>
         <jdepend-maven-plugin.version>2.0</jdepend-maven-plugin.version>
         <taglist-maven-plugin.version>2.4</taglist-maven-plugin.version>
-        <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
+        <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+        <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
         <dockerfile-maven.version>1.4.13</dockerfile-maven.version>
         <docker-compose-maven-plugin.version>4.0.0</docker-compose-maven-plugin.version>
         <checksum-maven-plugin.version>1.10</checksum-maven-plugin.version>
@@ -555,6 +558,17 @@
                 <artifactId>jul-to-slf4j</artifactId>
                 <version>${slf4j.version}</version>
             </dependency>
+    
+            <dependency>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java</artifactId>
+                <version>${protobuf-java.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java-util</artifactId>
+                <version>${protobuf-java.version}</version>
+            </dependency>
             
             <dependency>
                 <groupId>org.projectlombok</groupId>
diff --git a/proxy/backend/pom.xml b/proxy/backend/pom.xml
index d89528bfe1f..1572fec64b3 100644
--- a/proxy/backend/pom.xml
+++ b/proxy/backend/pom.xml
@@ -203,6 +203,11 @@
             <artifactId>shardingsphere-data-pipeline-distsql-handler</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-cdc-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
             <artifactId>shardingsphere-sql-federation-core</artifactId>
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
index 0ea805d8b9e..d4a8f03b256 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
@@ -97,6 +97,9 @@ public final class ProxyConfigurationLoader {
         if (null != serverConfiguration.getTraffic()) {
             serverConfiguration.getRules().add(serverConfiguration.getTraffic());
         }
+        if (null != serverConfiguration.getCdc()) {
+            serverConfiguration.getRules().add(serverConfiguration.getCdc());
+        }
         return serverConfiguration;
     }
     
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
index b3131836d43..f1d66d17178 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.backend.config.yaml;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCRuleConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -51,6 +52,8 @@ public final class YamlProxyServerConfiguration implements YamlConfiguration {
     
     private YamlTrafficRuleConfiguration traffic;
     
+    private YamlCDCRuleConfiguration cdc;
+    
     private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
     
     private Properties props = new Properties();
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
index 6f1d50ac23a..2f77b9aa5fb 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
@@ -48,6 +48,7 @@ public final class ProxyConfigurationLoaderTest {
         YamlProxyServerConfiguration serverConfig = actual.getServerConfiguration();
         assertNull(serverConfig.getMode());
         assertNull(serverConfig.getAuthority());
+        assertNull(serverConfig.getCdc());
         assertNull(serverConfig.getLabels());
         assertTrue(serverConfig.getProps().isEmpty());
         assertTrue(serverConfig.getRules().isEmpty());
diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
index ef272b93003..483de595dae 100644
--- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
+++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import org.apache.shardingsphere.proxy.arguments.BootstrapArguments;
 import org.apache.shardingsphere.proxy.backend.config.ProxyConfigurationLoader;
 import org.apache.shardingsphere.proxy.backend.config.YamlProxyConfiguration;
+import org.apache.shardingsphere.proxy.frontend.CDCServer;
 import org.apache.shardingsphere.proxy.frontend.ShardingSphereProxy;
 import org.apache.shardingsphere.proxy.initializer.BootstrapInitializer;
 
@@ -50,6 +51,10 @@ public final class Bootstrap {
         int port = bootstrapArgs.getPort().orElseGet(() -> new ConfigurationProperties(yamlConfig.getServerConfiguration().getProps()).getValue(ConfigurationPropertyKey.PROXY_DEFAULT_PORT));
         List<String> addresses = bootstrapArgs.getAddresses();
         new BootstrapInitializer().init(yamlConfig, port, bootstrapArgs.getForce());
+        boolean cdcEnabled = null != yamlConfig.getServerConfiguration().getCdc() && yamlConfig.getServerConfiguration().getCdc().isEnabled();
+        if (cdcEnabled) {
+            new CDCServer(addresses, yamlConfig.getServerConfiguration().getCdc().getPort()).start();
+        }
         new ShardingSphereProxy().start(port, addresses);
     }
 }
diff --git a/proxy/bootstrap/src/main/resources/conf/server.yaml b/proxy/bootstrap/src/main/resources/conf/server.yaml
index cbae3f3e249..166959eaa3c 100644
--- a/proxy/bootstrap/src/main/resources/conf/server.yaml
+++ b/proxy/bootstrap/src/main/resources/conf/server.yaml
@@ -55,6 +55,10 @@
 #    initialCapacity: 128
 #    maximumSize: 1024
 #
+#cdc:
+#  enabled: false
+#  port: 33071
+#
 #props:
 #  max-connections-size-per-query: 1
 #  kernel-executor-size: 16  # Infinite by default.
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java
new file mode 100644
index 00000000000..953de6168e4
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CDC server.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class CDCServer extends Thread {
+    
+    private final List<String> addressed;
+    
+    private final int port;
+    
+    private EventLoopGroup bossGroup;
+    
+    private EventLoopGroup workerGroup;
+    
+    @Override
+    @SneakyThrows(InterruptedException.class)
+    public void run() {
+        try {
+            List<ChannelFuture> futures = startInternal(addressed, port);
+            for (ChannelFuture each : futures) {
+                each.channel().closeFuture().sync();
+            }
+        } finally {
+            close();
+        }
+    }
+    
+    private List<ChannelFuture> startInternal(final List<String> addresses, final int port) throws InterruptedException {
+        createEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+                .group(bossGroup, workerGroup)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(new CDCServerHandlerInitializer());
+        List<ChannelFuture> futures = new ArrayList<>();
+        for (String address : addresses) {
+            futures.add(bootstrap.bind(address, port).sync());
+        }
+        return futures;
+    }
+    
+    private void createEventLoopGroup() {
+        bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
+        workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
+    }
+    
+    private void close() {
+        if (null != bossGroup) {
+            bossGroup.shutdownGracefully();
+        }
+        if (null != workerGroup) {
+            workerGroup.shutdownGracefully();
+        }
+    }
+}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
new file mode 100644
index 00000000000..c52ff9a1f2a
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * CDC channel inbound handler.
+ */
+@Slf4j
+public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
+    
+    private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
+    
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) {
+        CDCConnectionContext context = new CDCConnectionContext();
+        context.setStatus(CDCConnectionStatus.NOT_LOGGED_IN);
+        ctx.channel().attr(CONNECTION_CONTEXT_KEY).setIfAbsent(context);
+        CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1")
+                .build()).build();
+        ctx.writeAndFlush(response);
+    }
+    
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        CDCConnectionStatus status = connectionContext.getStatus();
+        CDCRequest request = (CDCRequest) msg;
+        if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
+            processLogin(ctx, request);
+            return;
+        }
+        switch (request.getRequestCase()) {
+            case CREATE_SUBSCRIPTION:
+                processCreateSubscription(ctx, request);
+                break;
+            case START_SUBSCRIPTION:
+                processStartSubscription(ctx, request, connectionContext);
+                break;
+            case STOP_SUBSCRIPTION:
+                stopStartSubscription(ctx, request, connectionContext);
+                break;
+            case DROP_SUBSCRIPTION:
+                dropStartSubscription(ctx, request);
+                break;
+            case ACK_REQUEST:
+                break;
+            default:
+                log.warn("Cannot handle this type of request {}", request);
+        }
+    }
+    
+    private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
+        if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        BasicBody body = request.getLogin().getBasicBody();
+        Collection<ShardingSphereRule> globalRules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
+        Optional<AuthorityRule> authorityRule = globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule -> (AuthorityRule) rule).findFirst();
+        if (!authorityRule.isPresent()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find authority rule")).addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        Optional<ShardingSphereUser> user = authorityRule.get().findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
+        if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) {
+            ctx.channel().attr(CONNECTION_CONTEXT_KEY).get().setStatus(CDCConnectionStatus.LOGGED_IN);
+            ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+            return;
+        }
+        ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Incorrect username or password")).addListener(ChannelFutureListener.CLOSE);
+    }
+    
+    private String getHostAddress(final ChannelHandlerContext context) {
+        SocketAddress socketAddress = context.channel().remoteAddress();
+        return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
+    }
+    
+    private void processCreateSubscription(final ChannelHandlerContext ctx, final CDCRequest request) {
+        if (!request.hasCreateSubscription()) {
+            ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request body"))
+                    .addListener(ChannelFutureListener.CLOSE);
+            return;
+        }
+        // TODO waiting for pipeline refactoring finished
+        CreateSubscriptionResult subscriptionResult = CreateSubscriptionResult.newBuilder().setSubscriptionName(request.getCreateSubscription().getSubscriptionName()).build();
+        ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(subscriptionResult).build());
+    }
+    
+    private void processStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        // TODO waiting for pipeline refactoring finished
+        connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
+        ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+    }
+    
+    private void stopStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+        // TODO waiting for pipeline refactoring finished
+        connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+        ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+    }
+    
+    private void dropStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request) {
+        // TODO waiting for pipeline refactoring finished
+        ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+    }
+}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java
new file mode 100644
index 00000000000..cc108b805f2
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+
+/**
+ * CDC server handler initializer.
+ */
+public final class CDCServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
+    
+    @Override
+    protected void initChannel(final SocketChannel channel) {
+        channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+        channel.pipeline().addLast(new ProtobufDecoder(CDCRequest.getDefaultInstance()));
+        channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+        channel.pipeline().addLast(new ProtobufEncoder());
+        channel.pipeline().addLast(new CDCChannelInboundHandler());
+    }
+}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
new file mode 100644
index 00000000000..845a270356f
--- /dev/null
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCChannelInboundHandlerTest {
+    
+    private static MockedStatic<ProxyContext> proxyContext;
+    
+    private final CDCChannelInboundHandler cdcChannelInboundHandler = new CDCChannelInboundHandler();
+    
+    private EmbeddedChannel channel;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        proxyContext = mockStatic(ProxyContext.class);
+        ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
+        proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
+        ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
+        when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
+        List<ShardingSphereRule> rules = Collections.singletonList(mockAuthRule());
+        when(globalRuleMetaData.getRules()).thenReturn(rules);
+    }
+    
+    private static AuthorityRule mockAuthRule() {
+        AuthorityRule result = mock(AuthorityRule.class);
+        ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
+        when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
+        when(mockUser.getPassword()).thenReturn("root");
+        when(result.findUser(any())).thenReturn(Optional.of(mockUser));
+        return result;
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        proxyContext.close();
+    }
+    
+    @Before
+    public void setup() {
+        channel = new EmbeddedChannel(new LoggingHandler(), cdcChannelInboundHandler);
+    }
+    
+    @Test
+    public void assertLoginRequestFailed() {
+        CDCRequest actualRequest = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+        assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.SERVER_ERROR.getCode()));
+        assertFalse(channel.isOpen());
+    }
+    
+    @Test
+    public void assertIllegalLoginRequest() {
+        CDCRequest actualRequest = CDCRequest.newBuilder().setVersion(1).setRequestId("test").build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+        assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR.getCode()));
+        assertFalse(channel.isOpen());
+    }
+    
+    @Test
+    public void assertLoginRequestSucceed() {
+        String encryptPassword = Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase();
+        Builder builder = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root").setPassword(encryptPassword).build()).build());
+        CDCRequest actualRequest = builder.build();
+        channel.writeInbound(actualRequest);
+        CDCResponse expectedGreetingResult = channel.readOutbound();
+        assertTrue(expectedGreetingResult.hasServerGreetingResult());
+        CDCResponse expectedLoginResult = channel.readOutbound();
+        assertThat(expectedLoginResult.getStatus(), is(Status.SUCCEED));
+        assertThat(expectedLoginResult.getErrorCode(), is(""));
+        assertThat(expectedLoginResult.getErrorMessage(), is(""));
+    }
+}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java
new file mode 100644
index 00000000000..bb6566c5d0d
--- /dev/null
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public final class CDCServerHandlerInitializerTest {
+    
+    @Test
+    public void assertInitChannel() {
+        SocketChannel channel = mock(SocketChannel.class);
+        ChannelPipeline pipeline = mock(ChannelPipeline.class);
+        when(channel.pipeline()).thenReturn(pipeline);
+        CDCServerHandlerInitializer initializer = new CDCServerHandlerInitializer();
+        initializer.initChannel(channel);
+        verify(pipeline).addLast(any(ProtobufVarint32FrameDecoder.class));
+        verify(pipeline).addLast(any(ProtobufDecoder.class));
+        verify(pipeline).addLast(any(ProtobufVarint32LengthFieldPrepender.class));
+        verify(pipeline).addLast(any(ProtobufEncoder.class));
+        verify(pipeline).addLast(any(CDCChannelInboundHandler.class));
+    }
+}