You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by az...@apache.org on 2023/04/21 03:34:02 UTC

[shardingsphere] branch master updated: Clean up CDC code and refactor StartCDCClientParameter (#25254)

This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b0bd0a2f268 Clean up CDC code and refactor StartCDCClientParameter (#25254)
b0bd0a2f268 is described below

commit b0bd0a2f268e77445e841a57512414e02b033485
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Fri Apr 21 11:33:55 2023 +0800

    Clean up CDC code and refactor StartCDCClientParameter (#25254)
    
    * Remove unused ImportDataSourceParameter
    
    * Remove unused try-catch in CDCRequestHandler.processDataRecords
    
    * Extract Consumer from StartCDCClientParameter
---
 .../data/pipeline/cdc/client/CDCClient.java        | 11 +++++--
 .../cdc/client/handler/CDCRequestHandler.java      | 15 ++--------
 .../parameter/ImportDataSourceParameter.java       | 35 ----------------------
 .../client/parameter/StartCDCClientParameter.java  |  4 ---
 .../pipeline/cdc/client/example/Bootstrap.java     |  4 +--
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  6 ++--
 6 files changed, 17 insertions(+), 58 deletions(-)

diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index d8532932415..459e6da6e4e 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -33,6 +33,10 @@ import org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHand
 import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
 import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.List;
+import java.util.function.Consumer;
 
 /**
  * CDC client.
@@ -42,9 +46,12 @@ public final class CDCClient {
     
     private final StartCDCClientParameter parameter;
     
-    public CDCClient(final StartCDCClientParameter parameter) {
+    private final Consumer<List<Record>> consumer;
+    
+    public CDCClient(final StartCDCClientParameter parameter, final Consumer<List<Record>> consumer) {
         validateParameter(parameter);
         this.parameter = parameter;
+        this.consumer = consumer;
     }
     
     private void validateParameter(final StartCDCClientParameter parameter) {
@@ -85,7 +92,7 @@ public final class CDCClient {
                         channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
                         channel.pipeline().addLast(new LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new CDCRequestHandler(parameter));
+                        channel.pipeline().addLast(new CDCRequestHandler(parameter, consumer));
                     }
                 });
         try {
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 425e0b1b9a3..1a2cce307bc 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
@@ -43,6 +44,7 @@ import java.util.function.Consumer;
 /**
  * CDC request handler.
  */
+@RequiredArgsConstructor
 @Slf4j
 public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
@@ -50,11 +52,6 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
     private final Consumer<List<Record>> consumer;
     
-    public CDCRequestHandler(final StartCDCClientParameter parameter) {
-        this.parameter = parameter;
-        consumer = parameter.getConsumer();
-    }
-    
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
         if (evt instanceof StreamDataEvent) {
@@ -91,13 +88,7 @@ public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
     private void processDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
         List<Record> recordsList = result.getRecordList();
-        try {
-            consumer.accept(recordsList);
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            throw new RuntimeException(ex);
-        }
+        consumer.accept(recordsList);
         ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
deleted file mode 100644
index 52a9664fe6d..00000000000
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-/**
- * Import data source parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class ImportDataSourceParameter {
-    
-    private final String jdbcUrl;
-    
-    private final String username;
-    
-    private final String password;
-}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index d92a55fc6e2..f7148d4880b 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -21,10 +21,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 import java.util.List;
-import java.util.function.Consumer;
 
 /**
  * Start CDC client parameter.
@@ -47,6 +45,4 @@ public final class StartCDCClientParameter {
     private List<SchemaTable> schemaTables;
     
     private boolean full;
-    
-    private final Consumer<List<Record>> consumer;
 }
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
index fc968c3ddf9..3e371a5aa4b 100644
--- a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
@@ -36,7 +36,7 @@ public final class Bootstrap {
         // Pay attention to the time zone, to avoid the problem of incorrect time zone, it is best to ensure that the time zone of the program is consistent with the time zone of the database server
         // and mysql-connector-java 5.x version will ignore serverTimezone jdbc parameter and use the default time zone in the program
         // TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        StartCDCClientParameter parameter = new StartCDCClientParameter(records -> log.info("records: {}", records));
+        StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("127.0.0.1");
         parameter.setPort(33071);
         parameter.setUsername("root");
@@ -44,7 +44,7 @@ public final class Bootstrap {
         parameter.setDatabase("sharding_db");
         parameter.setFull(true);
         parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
-        CDCClient cdcClient = new CDCClient(parameter);
+        CDCClient cdcClient = new CDCClient(parameter, records -> log.info("records: {}", records));
         cdcClient.start();
     }
 }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 2a514cc8d99..58361dd07c7 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -170,8 +170,7 @@ class CDCE2EIT {
     private void startCDCClient(final PipelineContainerComposer containerComposer) {
         DataSource dataSource = StorageContainerUtils.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false)),
                 containerComposer.getUsername(), containerComposer.getPassword());
-        DataSourceRecordConsumer dataSourceRecordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        StartCDCClientParameter parameter = new StartCDCClientParameter(dataSourceRecordConsumer);
+        StartCDCClientParameter parameter = new StartCDCClientParameter();
         parameter.setAddress("localhost");
         parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
         parameter.setUsername(ProxyContainerConstants.USERNAME);
@@ -181,7 +180,8 @@ class CDCE2EIT {
         parameter.setFull(true);
         String schema = containerComposer.getDatabaseType().isSchemaAvailable() ? "test" : "";
         parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build()));
-        CompletableFuture.runAsync(() -> new CDCClient(parameter).start(), executor).whenComplete((unused, throwable) -> {
+        DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
+        CompletableFuture.runAsync(() -> new CDCClient(parameter, recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
             if (null != throwable) {
                 log.error("cdc client sync failed, ", throwable);
             }