You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by GitBox <gi...@apache.org> on 2022/12/08 03:09:33 UTC

[GitHub] [shardingsphere] azexcy opened a new pull request, #22739: Add CDC create subscription event and listener

azexcy opened a new pull request, #22739:
URL: https://github.com/apache/shardingsphere/pull/22739

   Related #22500 
   
   Changes proposed in this pull request:
     - Add CDC create subscription event and listener
     - Add unit test
     - Add junit dependency
   
   ---
   
   Before committing this PR, I'm sure that I have checked the following options:
   - [ ] My code follows the [code of conduct](https://shardingsphere.apache.org/community/en/involved/conduct/code/) of this project.
   - [ ] I have self-reviewed the commit code.
   - [ ] I have (or in comment I request) added corresponding labels for the pull request.
   - [ ] I have passed maven check locally : `./mvnw clean install -B -T1C -Dmaven.javadoc.skip -Dmaven.jacoco.skip -e`.
   - [ ] I have made corresponding changes to the documentation.
   - [ ] I have added corresponding unit tests for my changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1043214688


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+
+import java.util.List;
+
+/**
+ * Start CDC client parameter.
+ */
+@Getter
+@Setter
+public final class StartCDCClientParameter {
+    
+    private String address = "localhost";
+    
+    private int port = 33071;
+    
+    private String username = "root";
+    
+    private String password = "root";

Review Comment:
   It's better not set defualt value for these fields



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/InvalidStartCDCClientParameterException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.exception;
+
+public final class InvalidStartCDCClientParameterException extends Exception {

Review Comment:
   Class javadoc is required



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/InvalidStartCDCClientParameterException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.exception;
+
+public final class InvalidStartCDCClientParameterException extends Exception {

Review Comment:
   Could we remove it and just use `IllegalArgumentException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] codecov-commenter commented on pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#issuecomment-1341945076

   # [Codecov](https://codecov.io/gh/apache/shardingsphere/pull/22739?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22739](https://codecov.io/gh/apache/shardingsphere/pull/22739?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (52aca55) into [master](https://codecov.io/gh/apache/shardingsphere/commit/9f94090137cdc4b325901d624d90a135f01adad1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9f94090) will **decrease** coverage by `0.09%`.
   > The diff coverage is `10.52%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #22739      +/-   ##
   ============================================
   - Coverage     49.39%   49.30%   -0.10%     
     Complexity     2443     2443              
   ============================================
     Files          4107     4121      +14     
     Lines         57476    57627     +151     
     Branches       9847     9866      +19     
   ============================================
   + Hits          28393    28412      +19     
   - Misses        26643    26773     +130     
   - Partials       2440     2442       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/shardingsphere/pull/22739?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...cdc/client/handler/SubscriptionRequestHandler.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvc2hhcmRpbmdzcGhlcmUvZGF0YS9waXBlbGluZS9jZGMvY2xpZW50L2hhbmRsZXIvU3Vic2NyaXB0aW9uUmVxdWVzdEhhbmRsZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...sphere/data/pipeline/cdc/api/CDCJobAPIFactory.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2FwaS9DRENKb2JBUElGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...here/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2FwaS9pbXBsL0NEQ0pvYkFQSUltcGwuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...a/pipeline/cdc/config/job/CDCJobConfiguration.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvbmZpZy9qb2IvQ0RDSm9iQ29uZmlndXJhdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...rdingsphere/data/pipeline/cdc/core/job/CDCJob.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvam9iL0NEQ0pvYi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/listener/CDCContextManagerLifecycleListener.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvbGlzdGVuZXIvQ0RDQ29udGV4dE1hbmFnZXJMaWZlY3ljbGVMaXN0ZW5lci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...processor/CDCJobConfigurationChangedProcessor.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvbWV0YWRhdGEvcHJvY2Vzc29yL0NEQ0pvYkNvbmZpZ3VyYXRpb25DaGFuZ2VkUHJvY2Vzc29yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...e/cdc/core/subscribe/CDCSubscriptionSubscribe.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL2NvcmUvc3Vic2NyaWJlL0NEQ1N1YnNjcmlwdGlvblN1YnNjcmliZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL3lhbWwvam9iL1lhbWxDRENKb2JDb25maWd1cmF0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...e/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-a2VybmVsL2RhdGEtcGlwZWxpbmUvY2RjL2NvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3NoYXJkaW5nc3BoZXJlL2RhdGEvcGlwZWxpbmUvY2RjL3lhbWwvam9iL1lhbWxDRENKb2JDb25maWd1cmF0aW9uU3dhcHBlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [8 more](https://codecov.io/gh/apache/shardingsphere/pull/22739/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] azexcy commented on a diff in pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
azexcy commented on code in PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1043117335


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -47,8 +47,8 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
     
     private CreateSubscriptionRequest buildCreateSubscriptionRequest() {

Review Comment:
   Use parameter instead of



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1042953244


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/subscribe/CDCSubscriptionSubscribe.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.subscribe;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+/**
+ * CDC subscription subscribe.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCSubscriptionSubscribe {
+    
+    private static final CDCSubscriptionSubscribe INSTANCE = new CDCSubscriptionSubscribe();
+    
+    private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static CDCSubscriptionSubscribe getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Start resharding job.
+     *
+     * @param event start resharding event.

Review Comment:
   Javadoc does not match method.



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -47,8 +47,8 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
     
     private CreateSubscriptionRequest buildCreateSubscriptionRequest() {
         // TODO the parameter shouldn't hard code, will be fixed when completed
-        TableName tableName = TableName.newBuilder().build();
-        return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db")
+        TableName tableName = TableName.newBuilder().setName("t_order").build();
+        return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("subscribe_sharding_db").setDatabase("sharding_db")
                 .addTableNames(tableName).build();

Review Comment:
   These test code could be in local test class and not committed (it could be put to E2E test later), `cdc-client` could be used as library



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * CDC job API impl.
+ */
+public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements CDCJobAPI {
+    
+    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
+    
+    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
+    
+    private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public String createJobAndStart(final CreateSubscriptionJobEvent event) {
+        YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
+        Map<String, String> tablesFirstDataNodesMap = new HashMap<>(event.getDataNodesMap().size(), 1);
+        for (Entry<String, List<DataNode>> entry : event.getDataNodesMap().entrySet()) {
+            tablesFirstDataNodesMap.put(entry.getKey(), new JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)))).marshal());
+        }
+        yamlJobConfig.setDatabase(event.getDatabase());
+        yamlJobConfig.setTableNames(event.getSubscribeTableNames());
+        yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
+        yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+        yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+        yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap);
+        extendYamlJobConfiguration(yamlJobConfig);
+        CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
+        start(jobConfig);
+        return jobConfig.getJobId();
+    }
+    
+    private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
+        Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
+        for (Entry<String, DataSource> entry : database.getResourceMetaData().getDataSources().entrySet()) {
+            dataSourceProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue())));
+        }
+        YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
+        targetRootConfig.setDatabaseName(database.getName());
+        targetRootConfig.setDataSources(dataSourceProps);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations());
+        targetRootConfig.setRules(yamlRuleConfigurations);
+        return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+    }
+    
+    @Override
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig;
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+                    config.getDataSourceConfiguration().getParameter());
+            config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final YamlCDCJobConfiguration config) {
+        CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSubscriptionName());
+        return marshalJobId(jobId);
+    }
+    
+    @Override
+    public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    @Override
+    protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+        return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        // TODO to be implement
+    }
+    
+    @Override
+    public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+        // TODO to be implement
+    }
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {

Review Comment:
   It's better to put it together with jobId related methods, e.g. generateJobId



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.config.job;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+
+import java.util.List;
+
+/**
+ * CDC job configuration.
+ */
+@Getter
+@RequiredArgsConstructor
+public final class CDCJobConfiguration implements PipelineJobConfiguration {
+    
+    private final String jobId;
+    
+    private final String database;
+    
+    private final List<String> tableNames;
+    
+    private final String subscriptionName;
+    
+    private final String subscriptionMode;
+    
+    private final String sourceDatabaseType;
+    
+    private final PipelineDataSourceConfiguration dataSourceConfiguration;
+    
+    @Override
+    public int getJobShardingCount() {
+        return 1;

Review Comment:
   Seems job sharding count won't be always `1`, does it need TODO?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * CDC job API impl.
+ */
+public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements CDCJobAPI {
+    
+    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
+    
+    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
+    
+    private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public String createJobAndStart(final CreateSubscriptionJobEvent event) {
+        YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
+        Map<String, String> tablesFirstDataNodesMap = new HashMap<>(event.getDataNodesMap().size(), 1);
+        for (Entry<String, List<DataNode>> entry : event.getDataNodesMap().entrySet()) {
+            tablesFirstDataNodesMap.put(entry.getKey(), new JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)))).marshal());
+        }
+        yamlJobConfig.setDatabase(event.getDatabase());
+        yamlJobConfig.setTableNames(event.getSubscribeTableNames());
+        yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
+        yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+        yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+        yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap);
+        extendYamlJobConfiguration(yamlJobConfig);
+        CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
+        start(jobConfig);
+        return jobConfig.getJobId();
+    }
+    
+    private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) {
+        Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
+        for (Entry<String, DataSource> entry : database.getResourceMetaData().getDataSources().entrySet()) {
+            dataSourceProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue())));
+        }
+        YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
+        targetRootConfig.setDatabaseName(database.getName());
+        targetRootConfig.setDataSources(dataSourceProps);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations());
+        targetRootConfig.setRules(yamlRuleConfigurations);
+        return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+    }
+    
+    @Override
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) {
+        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig;
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+                    config.getDataSourceConfiguration().getParameter());
+            config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final YamlCDCJobConfiguration config) {
+        CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSubscriptionName());
+        return marshalJobId(jobId);
+    }
+    
+    @Override
+    public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    @Override
+    protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
+        return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) {
+        // TODO to be implement
+    }
+    
+    @Override
+    public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
+        // TODO to be implement
+    }
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+        CDCJobId jobId = (CDCJobId) pipelineJobId;
+        String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getSubscriptionName());
+        return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+    }
+    
+    @Override
+    protected PipelineJobInfo getJobInfo(final String jobId) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    protected String getJobClassName() {
+        return CDCJob.class.getName();
+    }
+    
+    @Override
+    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {

Review Comment:
   It's better to put it together with jobConfiguration methods, e.g. getJobConfiguration



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/listener/CDCContextManagerLifecycleListener.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.core.subscribe.CDCSubscriptionSubscribe;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+
+/**
+ * CDC context manager lifecycle listener.
+ */
+@Slf4j
+public final class CDCContextManagerLifecycleListener implements ContextManagerLifecycleListener {

Review Comment:
   We could remove it and related configuration if event bus event is not required



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.handler.cdc;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * CDC backend handler.
+ */
+@Slf4j
+public final class CDCBackendHandler {
+    
+    /**
+     * Create subscription.
+     *
+     * @param request CDC request
+     * @return CDC response
+     */
+    public CDCResponse createSubscription(final CDCRequest request) {
+        CreateSubscriptionRequest subscriptionRequest = request.getCreateSubscription();
+        ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(subscriptionRequest.getDatabase());
+        if (null == database) {
+            return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", subscriptionRequest.getDatabase()));
+        }
+        List<String> tableNames = new LinkedList<>();
+        for (TableName each : subscriptionRequest.getTableNamesList()) {
+            tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? each.getName() : String.join(".", each.getSchema(), each.getName()));
+        }
+        Optional<ShardingRule> rule = database.getRuleMetaData().getRules().stream().filter(each -> each instanceof ShardingRule).map(each -> (ShardingRule) each).findFirst();
+        if (!rule.isPresent()) {
+            return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+        }
+        Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
+        for (String each : tableNames) {
+            actualDataNodesMap.put(each, getActualDataNodes(rule.get(), each));
+        }
+        CreateSubscriptionJobEvent event = new CreateSubscriptionJobEvent(subscriptionRequest.getDatabase(), tableNames, subscriptionRequest.getSubscriptionName(),
+                subscriptionRequest.getSubscriptionMode().name(), actualDataNodesMap);
+        ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(event);

Review Comment:
   Could we remove event bus event, just invoke underlying class? proxy-backend module already depends on cdc-core module.



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.handler.cdc;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCBackendHandlerTest {
+    
+    private static MockedStatic<PipelineContext> pipelineContextMocked;
+    
+    private final CDCBackendHandler handler = new CDCBackendHandler();
+    
+    @BeforeClass
+    public static void beforeClass() {
+        MetaDataContexts metaDataContexts = new MetaDataContexts(mock(MetaDataPersistService.class),
+                new ShardingSphereMetaData(getDatabases(), mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new Properties())));
+        ContextManager contextManager = new ContextManager(metaDataContexts, mock(InstanceContext.class));
+        pipelineContextMocked = mockStatic(PipelineContext.class);
+        pipelineContextMocked.when(PipelineContext::getContextManager).thenReturn(contextManager);
+    }
+    
+    private static Map<String, ShardingSphereDatabase> getDatabases() {
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS);
+        when(database.getName()).thenReturn("sharding_db");
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        Set<ShardingSphereRule> shardingRule = Collections.singleton(mock(ShardingSphereRule.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(shardingRule);
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("sharding_db", database);
+        return result;
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        pipelineContextMocked.close();
+    }
+    
+    @Test
+    public void assertCreateSubscriptionFailed() {
+        CDCRequest request = CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("none")).build();
+        CDCResponse actualResponse = handler.createSubscription(request);
+        assertThat(actualResponse.getStatus(), is(Status.FAILED));
+    }
+    
+    @Test
+    public void assertCreateSubscriptionSucceed() {
+        CDCRequest request = CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("sharding_db")).build();
+        CDCResponse actualResponse = handler.createSubscription(request);
+        assertThat(actualResponse.getStatus(), is(Status.FAILED));
+    }

Review Comment:
   `is(Status.FAILED)` doesn't match method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz commented on a diff in pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
sandynz commented on code in PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1043015049


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -47,8 +47,8 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
     
     private CreateSubscriptionRequest buildCreateSubscriptionRequest() {

Review Comment:
   Could we remove `buildCreateSubscriptionRequest`? It should be defined by caller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [shardingsphere] sandynz merged pull request #22739: Add CDC create subscription event and listener

Posted by GitBox <gi...@apache.org>.
sandynz merged PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@shardingsphere.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org