You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/12/05 06:35:27 UTC
[shardingsphere] branch master updated: Add CDC rule and login implemention (#22579)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 25165109c17 Add CDC rule and login implemention (#22579)
25165109c17 is described below
commit 25165109c1771f52eeaae715ee4c5f49ba620f47
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon Dec 5 14:35:18 2022 +0800
Add CDC rule and login implemention (#22579)
* Add cdc protocol
* Add cdc rule and connection context
* Add CDC server and handler, update server.yaml config
* Add CDC client login implementation
* Fix codestyle
* Improve code
* Add volatile
* Remove logback dependency
---
kernel/data-pipeline/cdc/client/pom.xml | 10 ++
.../data/pipeline/cdc/client/CDCClient.java | 76 +++++++++++
.../cdc/client/event/CreateSubscriptionEvent.java | 24 ++++
.../cdc/client/handler/LoginRequestHandler.java | 96 +++++++++++++
.../client/handler/SubscriptionRequestHandler.java | 84 ++++++++++++
.../pipeline/cdc/client/util/RequestIdUtil.java | 39 ++++++
.../pipeline/cdc/config/CDCRuleConfiguration.java | 34 +++++
.../pipeline/cdc/constant/CDCConnectionStatus.java | 30 +++++
.../data/pipeline/cdc/constant/CDCOrder.java | 26 ++++
.../pipeline/cdc/context/CDCConnectionContext.java | 32 +++++
.../cdc/generator/CDCResponseGenerator.java | 51 +++++++
.../data/pipeline/cdc/rule/CDCRule.java | 46 +++++++
.../pipeline/cdc/rule/builder/CDCRuleBuilder.java | 50 +++++++
.../cdc/yaml/config/YamlCDCRuleConfiguration.java | 41 ++++++
.../swapper/YamlCDCRuleConfigurationSwapper.java | 57 ++++++++
...ere.infra.rule.builder.global.GlobalRuleBuilder | 18 +++
...onfig.swapper.rule.YamlRuleConfigurationSwapper | 18 +++
kernel/data-pipeline/cdc/protocol/pom.xml | 41 ++++++
.../pipeline/cdc/common/CDCResponseErrorCode.java | 35 +++++
.../src/main/proto/CDCRequestProtocol.proto | 96 +++++++++++++
.../src/main/proto/CDCResponseProtocol.proto | 82 ++++++++++++
.../cdc/protocol/src/main/resources/.gitkeep | 18 +++
pom.xml | 16 ++-
proxy/backend/pom.xml | 5 +
.../backend/config/ProxyConfigurationLoader.java | 3 +
.../config/yaml/YamlProxyServerConfiguration.java | 3 +
.../config/ProxyConfigurationLoaderTest.java | 1 +
.../org/apache/shardingsphere/proxy/Bootstrap.java | 5 +
.../bootstrap/src/main/resources/conf/server.yaml | 4 +
.../shardingsphere/proxy/frontend/CDCServer.java | 101 ++++++++++++++
.../frontend/netty/CDCChannelInboundHandler.java | 148 +++++++++++++++++++++
.../netty/CDCServerHandlerInitializer.java | 41 ++++++
.../netty/CDCChannelInboundHandlerTest.java | 131 ++++++++++++++++++
.../netty/CDCServerHandlerInitializerTest.java | 48 +++++++
34 files changed, 1509 insertions(+), 1 deletion(-)
diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 2da0211d90b..4277e618b8d 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -33,5 +33,15 @@
<artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
new file mode 100644
index 00000000000..03c9a510a0c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+
+/**
+ * CDC client.
+ */
+@Slf4j
+public final class CDCClient {
+
+ /**
+ * Start ShardingSphere CDC client.
+ *
+ * @param port port
+ * @param address addresses
+ */
+ @SneakyThrows(InterruptedException.class)
+ public void start(final String address, final int port) {
+ startInternal(address, port);
+ }
+
+ private void startInternal(final String address, final int port) throws InterruptedException {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.channel(NioSocketChannel.class)
+ .group(new NioEventLoopGroup())
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .handler(new ChannelInitializer<NioSocketChannel>() {
+
+ @Override
+ protected void initChannel(final NioSocketChannel channel) {
+ channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+ channel.pipeline().addLast(new ProtobufDecoder(CDCResponse.getDefaultInstance()));
+ channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+ channel.pipeline().addLast(new ProtobufEncoder());
+ // TODO username and password are read from the configuration file or args
+ channel.pipeline().addLast(new LoginRequestHandler("root", "root"));
+ channel.pipeline().addLast(new SubscriptionRequestHandler());
+ }
+ });
+ ChannelFuture future = bootstrap.connect(address, port).sync();
+ future.channel().closeFuture().sync();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
new file mode 100644
index 00000000000..59bb67e898c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/CreateSubscriptionEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.event;
+
+/**
+ * Create subscription event.
+ */
+public final class CreateSubscriptionEvent {
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
new file mode 100644
index 00000000000..435862b143a
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.LoginType;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+
+import java.util.Objects;
+
+/**
+ * Login request handler.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
+
+ private static final AttributeKey<String> LOGIN_REQUEST_ID_KEY = AttributeKey.valueOf("login.request.id");
+
+ private final String username;
+
+ private final String password;
+
+ private boolean loggedIn;
+
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) {
+ ctx.channel().attr(LOGIN_REQUEST_ID_KEY).set(null);
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ if (loggedIn) {
+ ctx.fireChannelRead(msg);
+ return;
+ }
+ CDCResponse response = (CDCResponse) msg;
+ if (response.hasServerGreetingResult()) {
+ ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
+ log.info("Server greeting result, server version: {}, min protocol version: {}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
+ sendLoginRequest(ctx);
+ return;
+ }
+ if (Status.FAILED == response.getStatus()) {
+ log.error("login failed, {}", msg);
+ return;
+ }
+ if (Status.SUCCEED == response.getStatus() && Objects.equals(ctx.channel().attr(LOGIN_REQUEST_ID_KEY).get(), response.getRequestId())) {
+ log.info("login success, username {}", username);
+ loggedIn = true;
+ ctx.fireUserEventTriggered(new CreateSubscriptionEvent());
+ }
+ }
+
+ private void sendLoginRequest(final ChannelHandlerContext ctx) {
+ String encryptPassword = Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
+ LoginRequest loginRequest = LoginRequest.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build()).build();
+ String loginRequestId = RequestIdUtil.generateRequestId();
+ ctx.channel().attr(LOGIN_REQUEST_ID_KEY).setIfAbsent(loginRequestId);
+ CDCRequest data = CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLogin(loginRequest).build();
+ ctx.writeAndFlush(data);
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ log.error("login handler error", cause);
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
new file mode 100644
index 00000000000..3fa4cb4bb81
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+/**
+ * Subscription request handler.
+ */
+@Slf4j
+public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+ if (evt instanceof CreateSubscriptionEvent) {
+ CDCRequest request = CDCRequest.newBuilder().setCreateSubscription(buildCreateSubscriptionRequest()).setRequestId(RequestIdUtil.generateRequestId()).build();
+ ctx.writeAndFlush(request);
+ }
+ }
+
+ private CreateSubscriptionRequest buildCreateSubscriptionRequest() {
+ // TODO the parameter shouldn't hard code, will be fixed when completed
+ TableName tableName = TableName.newBuilder().build();
+ return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db")
+ .addTableNames(tableName).build();
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ CDCResponse response = (CDCResponse) msg;
+ if (Status.SUCCEED == response.getStatus()) {
+ processSucceed(ctx, response);
+ } else {
+ log.error("subscription response error {}", msg);
+ }
+ }
+
+ private void processSucceed(final ChannelHandlerContext ctx, final CDCResponse response) {
+ if (response.hasCreateSubscriptionResult()) {
+ log.info("create subscription succeed, subcrption name {}", response.getCreateSubscriptionResult().getSubscriptionName());
+ Builder builder = CDCRequest.newBuilder();
+ builder.setStartSubscription(buildStartSubscriptionRequest(response.getCreateSubscriptionResult().getSubscriptionName()));
+ builder.setRequestId(RequestIdUtil.generateRequestId());
+ ctx.writeAndFlush(builder.build());
+ }
+ // TODO waiting for pipeline refactoring finished
+ }
+
+ private StartSubscriptionRequest buildStartSubscriptionRequest(final String subscriptionName) {
+ return StartSubscriptionRequest.newBuilder().setSubscriptionName(subscriptionName).build();
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ log.error("subscription handler error", cause);
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java
new file mode 100644
index 00000000000..0c7f0a94dfa
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/RequestIdUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.UUID;
+
+/**
+ * Request id util.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class RequestIdUtil {
+
+ /**
+ * Generate request id.
+ *
+ * @return request id.
+ */
+ public static String generateRequestId() {
+ return UUID.randomUUID().toString();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java
new file mode 100644
index 00000000000..52c63af066e
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/CDCRuleConfiguration.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.config;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.rule.scope.GlobalRuleConfiguration;
+
+/**
+ * CDC rule configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRuleConfiguration implements GlobalRuleConfiguration {
+
+ private final boolean enabled;
+
+ private final int port;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
new file mode 100644
index 00000000000..09ce0e484db
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCConnectionStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC connection status.
+ */
+public enum CDCConnectionStatus {
+
+ NOT_LOGGED_IN,
+
+ LOGGED_IN,
+
+ SUBSCRIBED
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java
new file mode 100644
index 00000000000..7fc35073699
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/constant/CDCOrder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.constant;
+
+/**
+ * CDC rule order of load.
+ */
+public final class CDCOrder {
+
+ public static final int ORDER = 10000;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
new file mode 100644
index 00000000000..2b74ddfd816
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+
+/**
+ * CDC connection context.
+ */
+@Getter
+public final class CDCConnectionContext {
+
+ @Setter
+ private volatile CDCConnectionStatus status;
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
new file mode 100644
index 00000000000..dc24261e26b
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+/**
+ * CDC response message generator.
+ */
+public final class CDCResponseGenerator {
+
+ /**
+ * Succeed response builder.
+ *
+ * @param requestId request id
+ * @return succeed response builder
+ */
+ public static Builder succeedBuilder(final String requestId) {
+ return CDCResponse.newBuilder().setStatus(Status.SUCCEED).setRequestId(requestId);
+ }
+
+ /**
+ * Failed response.
+ *
+ * @param requestId request id
+ * @param errorCode error code
+ * @param errorMessage error message
+ * @return failed response
+ */
+ public static CDCResponse failed(final String requestId, final CDCResponseErrorCode errorCode, final String errorMessage) {
+ return CDCResponse.newBuilder().setStatus(Status.FAILED).setRequestId(requestId).setErrorCode(errorCode.getCode()).setErrorMessage(errorMessage).build();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java
new file mode 100644
index 00000000000..2f2b21df21f
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/CDCRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
+
+/**
+ * CDC rule.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CDCRule implements GlobalRule {
+
+ private final boolean enable;
+
+ private final int port;
+
+ @Override
+ public RuleConfiguration getConfiguration() {
+ return new CDCRuleConfiguration(enable, port);
+ }
+
+ @Override
+ public String getType() {
+ return CDCRule.class.getSimpleName();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java
new file mode 100644
index 00000000000..d4f38ea5b48
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/rule/builder/CDCRuleBuilder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.rule.builder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCOrder;
+import org.apache.shardingsphere.data.pipeline.cdc.rule.CDCRule;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder;
+
+import java.util.Map;
+
+/**
+ * CDC rule builder.
+ */
+public final class CDCRuleBuilder implements GlobalRuleBuilder<CDCRuleConfiguration> {
+
+ @Override
+ public CDCRule build(final CDCRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases, final InstanceContext instanceContext,
+ final ConfigurationProperties props) {
+ return new CDCRule(ruleConfig.isEnabled(), ruleConfig.getPort());
+ }
+
+ @Override
+ public int getOrder() {
+ return CDCOrder.ORDER;
+ }
+
+ @Override
+ public Class<CDCRuleConfiguration> getTypeClass() {
+ return CDCRuleConfiguration.class;
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java
new file mode 100644
index 00000000000..7768ee61a98
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/config/YamlCDCRuleConfiguration.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+
+/**
+ * CDC configuration for YAML.
+ */
+@Getter
+@Setter
+public final class YamlCDCRuleConfiguration implements YamlRuleConfiguration {
+
+ private boolean enabled;
+
+ private int port = 33071;
+
+ @Override
+ public Class<? extends RuleConfiguration> getRuleConfigurationType() {
+ return CDCRuleConfiguration.class;
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java
new file mode 100644
index 00000000000..7ef181271bf
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCRuleConfigurationSwapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;
+
+import org.apache.shardingsphere.data.pipeline.cdc.config.CDCRuleConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCOrder;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper;
+
+/**
+ * YAML CDC rule configuration swapper.
+ */
+public final class YamlCDCRuleConfigurationSwapper implements YamlRuleConfigurationSwapper<YamlCDCRuleConfiguration, CDCRuleConfiguration> {
+
+ @Override
+ public YamlCDCRuleConfiguration swapToYamlConfiguration(final CDCRuleConfiguration data) {
+ YamlCDCRuleConfiguration result = new YamlCDCRuleConfiguration();
+ result.setPort(null == data ? 33071 : data.getPort());
+ result.setEnabled(null != data && data.isEnabled());
+ return result;
+ }
+
+ @Override
+ public CDCRuleConfiguration swapToObject(final YamlCDCRuleConfiguration yamlConfig) {
+ return new CDCRuleConfiguration(yamlConfig.isEnabled(), yamlConfig.getPort());
+ }
+
+ @Override
+ public Class<CDCRuleConfiguration> getTypeClass() {
+ return CDCRuleConfiguration.class;
+ }
+
+ @Override
+ public String getRuleTagName() {
+ return "CDC";
+ }
+
+ @Override
+ public int getOrder() {
+ return CDCOrder.ORDER;
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder
new file mode 100644
index 00000000000..8bf328f3517
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.rule.builder.global.GlobalRuleBuilder
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.rule.builder.CDCRuleBuilder
diff --git a/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper
new file mode 100644
index 00000000000..526ecb96c8c
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapper
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCRuleConfigurationSwapper
diff --git a/kernel/data-pipeline/cdc/protocol/pom.xml b/kernel/data-pipeline/cdc/protocol/pom.xml
index 58d17e6e585..7460d31d940 100644
--- a/kernel/data-pipeline/cdc/protocol/pom.xml
+++ b/kernel/data-pipeline/cdc/protocol/pom.xml
@@ -27,4 +27,45 @@
<artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
<name>${project.artifactId}</name>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ <configuration>
+ <additionalProtoPathElements>
+ <additionalProtoPathElement>${project.basedir}/src/main/resources</additionalProtoPathElement>
+ </additionalProtoPathElements>
+ <protocArtifact>com.google.protobuf:protoc:${protobuf-java.version}:exe:${os.detected.classifier}</protocArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>test-compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ </extension>
+ </extensions>
+ </build>
</project>
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
new file mode 100644
index 00000000000..45d57900720
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.common;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * CDC response error code.
+ */
+@RequiredArgsConstructor
+public enum CDCResponseErrorCode {
+
+ SERVER_ERROR("1"),
+
+ ILLEGAL_REQUEST_ERROR("2");
+
+ @Getter
+ private final String code;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
new file mode 100644
index 00000000000..b26bcc9dd52
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCRequestProtocol";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.protocol.request";
+
+message CDCRequest {
+ int32 version = 1;
+ string request_id = 2;
+ enum Type {
+ UNKNOWN = 0;
+ LOGIN = 1;
+ CREATE_SUBSCRIPTION = 2;
+ START_SUBSCRIPTION = 3;
+ STOP_SUBSCRIPTION = 4;
+ DROP_SUBSCRIPTION = 5;
+ FETCH_RECORDS = 6;
+ FETCH_RECORDS_ACK = 7;
+ }
+ Type type = 3;
+ oneof request {
+ LoginRequest login = 4;
+ CreateSubscriptionRequest create_subscription = 5;
+ StartSubscriptionRequest start_subscription = 6;
+ AckRequest ack_request = 7;
+ StopSubscriptionRequest stop_subscription = 8;
+ DropSubscriptionRequest drop_subscription = 9;
+ }
+}
+
+message LoginRequest {
+ enum LoginType {
+ UNKNOWN = 0;
+ BASIC = 1;
+ }
+ LoginType type = 1;
+ oneof body {
+ BasicBody basic_body = 2;
+ }
+
+ message BasicBody {
+ string username = 1;
+ string password = 2;
+ }
+}
+
+message CreateSubscriptionRequest {
+ string database = 1;
+ message TableName {
+ string schema = 1;
+ string name = 2;
+ }
+ repeated TableName tableNames = 2;
+ string subscriptionName = 3;
+ SubscriptionMode subscriptionMode = 4;
+
+ enum SubscriptionMode {
+ UNKNOWN = 0;
+ INCREMENTAL = 1;
+ FULL = 2;
+ }
+}
+
+message StartSubscriptionRequest {
+ string subscriptionName = 1;
+}
+
+message StopSubscriptionRequest {
+ string subscriptionName = 1;
+}
+
+message DropSubscriptionRequest {
+ string subscriptionName = 1;
+}
+
+message AckRequest {
+ string subscriptionName = 1;
+ string lastReceivedPosition = 2;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
new file mode 100644
index 00000000000..daeb01d9bf2
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/any.proto";
+
+option java_multiple_files = true;
+option java_outer_classname = "CDCResponseProtocol";
+option java_package = "org.apache.shardingsphere.data.pipeline.cdc.protocol.response";
+
+message CDCResponse {
+ string request_id = 1;
+ enum Status {
+ UNKNOWN = 0;
+ SUCCEED = 1;
+ FAILED = 2;
+ }
+ Status status = 2;
+ oneof response {
+ ServerGreetingResult server_greeting_result = 3;
+ CreateSubscriptionResult create_subscription_result = 4;
+ FetchRecordResult fetch_record_result = 5;
+ }
+
+ optional string error_code = 14;
+ optional string error_message = 15;
+}
+
+message ServerGreetingResult {
+ string server_version = 1;
+ string protocol_version = 2;
+}
+
+message CreateSubscriptionResult {
+ string subscriptionName = 1;
+ bool existing = 2;
+}
+
+message FetchRecordResult {
+ message Record {
+ map<string, google.protobuf.Any> before = 1;
+ map<string, google.protobuf.Any> after = 2;
+ message TableMetaData {
+ string database = 1;
+ optional string schema = 2;
+ string tableName = 3;
+ }
+ TableMetaData tableMetaData = 3;
+ int64 transaction_commit_millis = 4;
+ enum DataChangeType {
+ UNKNOWN = 0;
+ INSERT = 1;
+ UPDATE = 2;
+ DELETE = 3;
+ CREATE_TABLE = 4;
+ ALTER_TABLE = 5;
+ DROP_TABLE = 6;
+ CREATE_INDEX = 7;
+ ALTER_INDEX = 8;
+ DROP_INDEX = 9;
+ }
+ bool isDDL = 6;
+ optional string ddlSQL = 7;
+ }
+ repeated Record records = 1;
+ bool hasNext = 2;
+}
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep b/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep
new file mode 100644
index 00000000000..325be7c7905
--- /dev/null
+++ b/kernel/data-pipeline/cdc/protocol/src/main/resources/.gitkeep
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# The protobuf maven plugin requires the existence of the resources directory, just keep it.
diff --git a/pom.xml b/pom.xml
index d0881f2fc14..bd930b317a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,8 @@
<hamcrest.version>1.3</hamcrest.version>
<mockito.version>4.8.0</mockito.version>
+ <protobuf-java.version>3.17.1</protobuf-java.version>
+
<!-- Plugin versions -->
<apache-rat-plugin.version>0.15</apache-rat-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
@@ -148,7 +150,8 @@
<maven-pmd-plugin.version>3.5</maven-pmd-plugin.version>
<jdepend-maven-plugin.version>2.0</jdepend-maven-plugin.version>
<taglist-maven-plugin.version>2.4</taglist-maven-plugin.version>
- <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
+ <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+ <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
<dockerfile-maven.version>1.4.13</dockerfile-maven.version>
<docker-compose-maven-plugin.version>4.0.0</docker-compose-maven-plugin.version>
<checksum-maven-plugin.version>1.10</checksum-maven-plugin.version>
@@ -555,6 +558,17 @@
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf-java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf-java.version}</version>
+ </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
diff --git a/proxy/backend/pom.xml b/proxy/backend/pom.xml
index d89528bfe1f..1572fec64b3 100644
--- a/proxy/backend/pom.xml
+++ b/proxy/backend/pom.xml
@@ -203,6 +203,11 @@
<artifactId>shardingsphere-data-pipeline-distsql-handler</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-data-pipeline-cdc-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-sql-federation-core</artifactId>
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
index 0ea805d8b9e..d4a8f03b256 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoader.java
@@ -97,6 +97,9 @@ public final class ProxyConfigurationLoader {
if (null != serverConfiguration.getTraffic()) {
serverConfiguration.getRules().add(serverConfiguration.getTraffic());
}
+ if (null != serverConfiguration.getCdc()) {
+ serverConfiguration.getRules().add(serverConfiguration.getCdc());
+ }
return serverConfiguration;
}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
index b3131836d43..f1d66d17178 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/config/yaml/YamlProxyServerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.proxy.backend.config.yaml;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCRuleConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -51,6 +52,8 @@ public final class YamlProxyServerConfiguration implements YamlConfiguration {
private YamlTrafficRuleConfiguration traffic;
+ private YamlCDCRuleConfiguration cdc;
+
private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
private Properties props = new Properties();
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
index 6f1d50ac23a..2f77b9aa5fb 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/config/ProxyConfigurationLoaderTest.java
@@ -48,6 +48,7 @@ public final class ProxyConfigurationLoaderTest {
YamlProxyServerConfiguration serverConfig = actual.getServerConfiguration();
assertNull(serverConfig.getMode());
assertNull(serverConfig.getAuthority());
+ assertNull(serverConfig.getCdc());
assertNull(serverConfig.getLabels());
assertTrue(serverConfig.getProps().isEmpty());
assertTrue(serverConfig.getRules().isEmpty());
diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
index ef272b93003..483de595dae 100644
--- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
+++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.proxy.arguments.BootstrapArguments;
import org.apache.shardingsphere.proxy.backend.config.ProxyConfigurationLoader;
import org.apache.shardingsphere.proxy.backend.config.YamlProxyConfiguration;
+import org.apache.shardingsphere.proxy.frontend.CDCServer;
import org.apache.shardingsphere.proxy.frontend.ShardingSphereProxy;
import org.apache.shardingsphere.proxy.initializer.BootstrapInitializer;
@@ -50,6 +51,10 @@ public final class Bootstrap {
int port = bootstrapArgs.getPort().orElseGet(() -> new ConfigurationProperties(yamlConfig.getServerConfiguration().getProps()).getValue(ConfigurationPropertyKey.PROXY_DEFAULT_PORT));
List<String> addresses = bootstrapArgs.getAddresses();
new BootstrapInitializer().init(yamlConfig, port, bootstrapArgs.getForce());
+ boolean cdcEnabled = null != yamlConfig.getServerConfiguration().getCdc() && yamlConfig.getServerConfiguration().getCdc().isEnabled();
+ if (cdcEnabled) {
+ new CDCServer(addresses, yamlConfig.getServerConfiguration().getCdc().getPort()).start();
+ }
new ShardingSphereProxy().start(port, addresses);
}
}
diff --git a/proxy/bootstrap/src/main/resources/conf/server.yaml b/proxy/bootstrap/src/main/resources/conf/server.yaml
index cbae3f3e249..166959eaa3c 100644
--- a/proxy/bootstrap/src/main/resources/conf/server.yaml
+++ b/proxy/bootstrap/src/main/resources/conf/server.yaml
@@ -55,6 +55,10 @@
# initialCapacity: 128
# maximumSize: 1024
#
+#cdc:
+# enabled: false
+# port: 33071
+#
#props:
# max-connections-size-per-query: 1
# kernel-executor-size: 16 # Infinite by default.
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java
new file mode 100644
index 00000000000..953de6168e4
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/CDCServer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CDC server.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public final class CDCServer extends Thread {
+
+ private final List<String> addressed;
+
+ private final int port;
+
+ private EventLoopGroup bossGroup;
+
+ private EventLoopGroup workerGroup;
+
+ @Override
+ @SneakyThrows(InterruptedException.class)
+ public void run() {
+ try {
+ List<ChannelFuture> futures = startInternal(addressed, port);
+ for (ChannelFuture each : futures) {
+ each.channel().closeFuture().sync();
+ }
+ } finally {
+ close();
+ }
+ }
+
+ private List<ChannelFuture> startInternal(final List<String> addresses, final int port) throws InterruptedException {
+ createEventLoopGroup();
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+ .group(bossGroup, workerGroup)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new CDCServerHandlerInitializer());
+ List<ChannelFuture> futures = new ArrayList<>();
+ for (String address : addresses) {
+ futures.add(bootstrap.bind(address, port).sync());
+ }
+ return futures;
+ }
+
+ private void createEventLoopGroup() {
+ bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
+ workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
+ }
+
+ private void close() {
+ if (null != bossGroup) {
+ bossGroup.shutdownGracefully();
+ }
+ if (null != workerGroup) {
+ workerGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
new file mode 100644
index 00000000000..c52ff9a1f2a
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CreateSubscriptionResult;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * CDC channel inbound handler.
+ */
+@Slf4j
+public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
+
+ private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx) {
+ CDCConnectionContext context = new CDCConnectionContext();
+ context.setStatus(CDCConnectionStatus.NOT_LOGGED_IN);
+ ctx.channel().attr(CONNECTION_CONTEXT_KEY).setIfAbsent(context);
+ CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1")
+ .build()).build();
+ ctx.writeAndFlush(response);
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+ CDCConnectionStatus status = connectionContext.getStatus();
+ CDCRequest request = (CDCRequest) msg;
+ if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
+ processLogin(ctx, request);
+ return;
+ }
+ switch (request.getRequestCase()) {
+ case CREATE_SUBSCRIPTION:
+ processCreateSubscription(ctx, request);
+ break;
+ case START_SUBSCRIPTION:
+ processStartSubscription(ctx, request, connectionContext);
+ break;
+ case STOP_SUBSCRIPTION:
+ stopStartSubscription(ctx, request, connectionContext);
+ break;
+ case DROP_SUBSCRIPTION:
+ dropStartSubscription(ctx, request);
+ break;
+ case ACK_REQUEST:
+ break;
+ default:
+ log.warn("Cannot handle this type of request {}", request);
+ }
+ }
+
+ private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
+ if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
+ ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request body")).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ BasicBody body = request.getLogin().getBasicBody();
+ Collection<ShardingSphereRule> globalRules = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
+ Optional<AuthorityRule> authorityRule = globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule -> (AuthorityRule) rule).findFirst();
+ if (!authorityRule.isPresent()) {
+ ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find authority rule")).addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ Optional<ShardingSphereUser> user = authorityRule.get().findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
+ if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) {
+ ctx.channel().attr(CONNECTION_CONTEXT_KEY).get().setStatus(CDCConnectionStatus.LOGGED_IN);
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ return;
+ }
+ ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Incorrect username or password")).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private String getHostAddress(final ChannelHandlerContext context) {
+ SocketAddress socketAddress = context.channel().remoteAddress();
+ return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
+ }
+
+ private void processCreateSubscription(final ChannelHandlerContext ctx, final CDCRequest request) {
+ if (!request.hasCreateSubscription()) {
+ ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request body"))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ // TODO waiting for pipeline refactoring finished
+ CreateSubscriptionResult subscriptionResult = CreateSubscriptionResult.newBuilder().setSubscriptionName(request.getCreateSubscription().getSubscriptionName()).build();
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(subscriptionResult).build());
+ }
+
+ private void processStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+ // TODO waiting for pipeline refactoring finished
+ connectionContext.setStatus(CDCConnectionStatus.SUBSCRIBED);
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ }
+
+ private void stopStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
+ // TODO waiting for pipeline refactoring finished
+ connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ }
+
+ private void dropStartSubscription(final ChannelHandlerContext ctx, final CDCRequest request) {
+ // TODO waiting for pipeline refactoring finished
+ ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ }
+}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java
new file mode 100644
index 00000000000..cc108b805f2
--- /dev/null
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+
+/**
+ * CDC server handler initializer.
+ */
+public final class CDCServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
+
+ @Override
+ protected void initChannel(final SocketChannel channel) {
+ channel.pipeline().addLast(new ProtobufVarint32FrameDecoder());
+ channel.pipeline().addLast(new ProtobufDecoder(CDCRequest.getDefaultInstance()));
+ channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
+ channel.pipeline().addLast(new ProtobufEncoder());
+ channel.pipeline().addLast(new CDCChannelInboundHandler());
+ }
+}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
new file mode 100644
index 00000000000..845a270356f
--- /dev/null
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import com.google.common.hash.Hashing;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCChannelInboundHandlerTest {
+
+ private static MockedStatic<ProxyContext> proxyContext;
+
+ private final CDCChannelInboundHandler cdcChannelInboundHandler = new CDCChannelInboundHandler();
+
+ private EmbeddedChannel channel;
+
+ @BeforeClass
+ public static void beforeClass() {
+ proxyContext = mockStatic(ProxyContext.class);
+ ProxyContext mockedProxyContext = mock(ProxyContext.class, RETURNS_DEEP_STUBS);
+ proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
+ ShardingSphereRuleMetaData globalRuleMetaData = mock(ShardingSphereRuleMetaData.class);
+ when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
+ List<ShardingSphereRule> rules = Collections.singletonList(mockAuthRule());
+ when(globalRuleMetaData.getRules()).thenReturn(rules);
+ }
+
+ private static AuthorityRule mockAuthRule() {
+ AuthorityRule result = mock(AuthorityRule.class);
+ ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
+ when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
+ when(mockUser.getPassword()).thenReturn("root");
+ when(result.findUser(any())).thenReturn(Optional.of(mockUser));
+ return result;
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ proxyContext.close();
+ }
+
+ @Before
+ public void setup() {
+ channel = new EmbeddedChannel(new LoggingHandler(), cdcChannelInboundHandler);
+ }
+
+ @Test
+ public void assertLoginRequestFailed() {
+ CDCRequest actualRequest = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+ channel.writeInbound(actualRequest);
+ CDCResponse expectedGreetingResult = channel.readOutbound();
+ assertTrue(expectedGreetingResult.hasServerGreetingResult());
+ CDCResponse expectedLoginResult = channel.readOutbound();
+ assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+ assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.SERVER_ERROR.getCode()));
+ assertFalse(channel.isOpen());
+ }
+
+ @Test
+ public void assertIllegalLoginRequest() {
+ CDCRequest actualRequest = CDCRequest.newBuilder().setVersion(1).setRequestId("test").build();
+ channel.writeInbound(actualRequest);
+ CDCResponse expectedGreetingResult = channel.readOutbound();
+ assertTrue(expectedGreetingResult.hasServerGreetingResult());
+ CDCResponse expectedLoginResult = channel.readOutbound();
+ assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
+ assertThat(expectedLoginResult.getErrorCode(), is(CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR.getCode()));
+ assertFalse(channel.isOpen());
+ }
+
+ @Test
+ public void assertLoginRequestSucceed() {
+ String encryptPassword = Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase();
+ Builder builder = CDCRequest.newBuilder().setLogin(LoginRequest.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root").setPassword(encryptPassword).build()).build());
+ CDCRequest actualRequest = builder.build();
+ channel.writeInbound(actualRequest);
+ CDCResponse expectedGreetingResult = channel.readOutbound();
+ assertTrue(expectedGreetingResult.hasServerGreetingResult());
+ CDCResponse expectedLoginResult = channel.readOutbound();
+ assertThat(expectedLoginResult.getStatus(), is(Status.SUCCEED));
+ assertThat(expectedLoginResult.getErrorCode(), is(""));
+ assertThat(expectedLoginResult.getErrorMessage(), is(""));
+ }
+}
diff --git a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java
new file mode 100644
index 00000000000..bb6566c5d0d
--- /dev/null
+++ b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCServerHandlerInitializerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.junit.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public final class CDCServerHandlerInitializerTest {
+
+ @Test
+ public void assertInitChannel() {
+ SocketChannel channel = mock(SocketChannel.class);
+ ChannelPipeline pipeline = mock(ChannelPipeline.class);
+ when(channel.pipeline()).thenReturn(pipeline);
+ CDCServerHandlerInitializer initializer = new CDCServerHandlerInitializer();
+ initializer.initChannel(channel);
+ verify(pipeline).addLast(any(ProtobufVarint32FrameDecoder.class));
+ verify(pipeline).addLast(any(ProtobufDecoder.class));
+ verify(pipeline).addLast(any(ProtobufVarint32LengthFieldPrepender.class));
+ verify(pipeline).addLast(any(ProtobufEncoder.class));
+ verify(pipeline).addLast(any(CDCChannelInboundHandler.class));
+ }
+}