You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/17 05:59:50 UTC

[shardingsphere] branch master updated: CDC stream data source table name wildcard support (#24209)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a14b167ab0 CDC stream data source table name wildcard support (#24209)
0a14b167ab0 is described below

commit 0a14b167ab066e79c2171e62692d3c748797ac59
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 17 13:59:44 2023 +0800

    CDC stream data source table name wildcard support (#24209)
    
    * CDC stream data source table name wildcard support
    
    * Add error code
    
    * Improve
---
 .../user-manual/error-code/sql-error-code.cn.md    |  1 +
 .../user-manual/error-code/sql-error-code.en.md    |  1 +
 kernel/data-pipeline/cdc/client/pom.xml            |  4 ++
 .../client/example/{opengauss => }/Bootstrap.java  |  5 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  8 +--
 .../importer/connector/CDCImporterConnector.java   |  6 +-
 .../data/pipeline/cdc/core/job/CDCJobId.java       |  6 +-
 .../NotFindStreamDataSourceTableException.java     | 33 +++++++++
 .../backend/handler/cdc/CDCBackendHandler.java     | 81 ++++++++++++++++++++--
 .../backend/handler/cdc/CDCBackendHandlerTest.java | 67 ++++++++++++++++++
 .../frontend/netty/CDCChannelInboundHandler.java   |  1 -
 11 files changed, 195 insertions(+), 18 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index ad9cf82d337..211c412b93e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -131,6 +131,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | HY000     | 18093       | Can not poll event because of binlog sync channel already closed. |
 | HY000     | 18095       | Can not find consistency check job of \`%s\`. |
 | HY000     | 18096       | Uncompleted consistency check job \`%s\` exists. |
+| HY000     | 18200       | Not find stream data source table. |
 
 ### DistSQL
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 7ba9c53a34a..932965e2904 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -131,6 +131,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
 | HY000     | 18093       | Can not poll event because of binlog sync channel already closed. |
 | HY000     | 18095       | Can not find consistency check job of \`%s\`. |
 | HY000     | 18096       | Uncompleted consistency check job \`%s\` exists. |
+| HY000     | 18200       | Not find stream data source table. |
 
 ### DistSQL
 
diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index f549dbb815f..49f407aaf26 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -47,6 +47,10 @@
             <groupId>org.opengauss</groupId>
             <artifactId>opengauss-jdbc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+        </dependency>
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
similarity index 92%
rename from kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
rename to kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
index 5bafbec1ecf..a6b51537e75 100644
--- a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
+package org.apache.shardingsphere.data.pipeline.cdc.client.example;
 
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
 import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
@@ -32,7 +32,7 @@ public final class Bootstrap {
      * @param args args
      */
     public static void main(final String[] args) {
-        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+        ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
         StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
         parameter.setAddress("127.0.0.1");
         parameter.setPort(33071);
@@ -41,6 +41,7 @@ public final class Bootstrap {
         parameter.setDatabase("sharding_db");
         parameter.setFull(true);
         parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
+        // support MySQL, PostgreSQL, openGauss
         parameter.setDatabaseType("openGauss");
         CDCClient cdcClient = new CDCClient(parameter);
         cdcClient.start();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bf7b9e97c9d..30ad2964e33 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -201,7 +201,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
         CDCJobId jobId = (CDCJobId) pipelineJobId;
-        String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getTableNames(), jobId.isFull());
+        String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull());
         return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
     }
     
@@ -216,7 +216,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         return result;
     }
     
-    private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final List<String> tableNames) {
+    private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collection<String> tableNames) {
         Map<LogicTableName, String> tableNameSchemaMap = new LinkedHashMap<>();
         for (String each : tableNames) {
             String[] split = each.split("\\.");
@@ -243,7 +243,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         return result;
     }
     
-    private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final List<String> logicalTableNames,
+    private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
                                                              final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
                 jobConfig.getDataSourceConfig().getParameter());
@@ -251,7 +251,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
         JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm();
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         Map<LogicTableName, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor()
-                .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), logicalTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
+                .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index d1b3e8e90bc..77ccf10446c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -38,6 +38,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -84,11 +85,12 @@ public final class CDCImporterConnector implements ImporterConnector {
     
     private Thread incrementalImporterTask;
     
-    public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final List<String> tableNames, final Comparator<DataRecord> dataRecordComparator) {
+    public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
+                                final Comparator<DataRecord> dataRecordComparator) {
         this.channel = channel;
         this.database = database;
         this.jobShardingCount = jobShardingCount;
-        tableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
+        schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
             String[] split = each.split("\\.");
             tableNameSchemaMap.put(split[1], split[0]);
         });
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index 0182c35a74b..1a6590b8f5c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -35,14 +35,14 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final List<String> tableNames;
+    private final List<String> schemaTableNames;
     
     private final boolean full;
     
-    public CDCJobId(final String databaseName, final List<String> tableNames, final boolean full) {
+    public CDCJobId(final String databaseName, final List<String> schemaTableNames, final boolean full) {
         super(new CDCJobType(), CURRENT_VERSION);
         this.databaseName = databaseName;
-        this.tableNames = tableNames;
+        this.schemaTableNames = schemaTableNames;
         this.full = full;
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
new file mode 100644
index 00000000000..fadddcace75
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
+
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
+/**
+ * Not find stream data source table exception.
+ */
+public final class NotFindStreamDataSourceTableException extends PipelineSQLException {
+    
+    private static final long serialVersionUID = 4003436152767041454L;
+    
+    public NotFindStreamDataSourceTableException() {
+        super(XOpenSQLState.GENERAL_ERROR, 200, "Not find stream data source table exception");
+    }
+}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 8707da0276a..dbbdb7f60dc 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
 import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
@@ -41,19 +42,28 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
 import org.apache.shardingsphere.sharding.rule.TableRule;
 
 import java.sql.SQLException;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * CDC backend handler.
@@ -77,9 +87,19 @@ public final class CDCBackendHandler {
         if (null == database) {
             return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", requestBody.getDatabase()));
         }
-        List<String> schemaTableNames = new LinkedList<>();
-        for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
-            schemaTableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? each.getTable() : String.join(".", each.getSchema(), each.getTable()));
+        Map<String, Collection<String>> schemaTableNameMap;
+        Collection<String> tableNames;
+        Set<String> schemaTableNames = new HashSet<>();
+        if (database.getProtocolType().isSchemaAvailable()) {
+            schemaTableNameMap = getSchemaTableMapWithSchema(database, requestBody.getSourceSchemaTablesList());
+            // TODO if different schema have same table names, table name may be overwritten, because the table name at sharding rule not contain schema.
+            tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+            schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
+        } else {
+            tableNames = getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList());
+        }
+        if (tableNames.isEmpty()) {
+            throw new NotFindStreamDataSourceTableException();
         }
         Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
         if (!shardingRule.isPresent()) {
@@ -87,17 +107,66 @@ public final class CDCBackendHandler {
         }
         Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
         // TODO need support case-insensitive later
-        for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
-            actualDataNodesMap.put(each.getTable(), getActualDataNodes(shardingRule.get(), each.getTable()));
+        for (String each : tableNames) {
+            actualDataNodesMap.put(each, getActualDataNodes(shardingRule.get(), each));
         }
         boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
-        StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), schemaTableNames, requestBody.getFull(), actualDataNodesMap, decodeWithTx);
+        StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
         String jobId = jobAPI.createJob(parameter);
         connectionContext.setJobId(jobId);
         startStreaming(requestId, jobId, connectionContext, channel);
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
+    private Map<String, Collection<String>> getSchemaTableMapWithSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Map<String, Collection<String>> result = new HashMap<>();
+        Collection<String> systemSchemas = database.getProtocolType().getSystemSchemas();
+        Optional<SchemaTable> allSchemaTablesOptional = schemaTables.stream().filter(each -> "*".equals(each.getTable()) && "*".equals(each.getSchema())).findFirst();
+        if (allSchemaTablesOptional.isPresent()) {
+            for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                if (systemSchemas.contains(entry.getKey())) {
+                    continue;
+                }
+                entry.getValue().getAllTableNames().forEach(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+            }
+            return result;
+        }
+        for (SchemaTable each : schemaTables) {
+            if ("*".equals(each.getSchema())) {
+                for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+                    if (systemSchemas.contains(entry.getKey())) {
+                        continue;
+                    }
+                    entry.getValue().getAllTableNames().stream().filter(tableName -> tableName.equals(each.getTable())).findFirst()
+                            .ifPresent(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+                }
+            } else if ("*".equals(each.getTable())) {
+                String schemaName = each.getSchema().isEmpty() ? getDefaultSchema(database.getProtocolType()) : each.getSchema();
+                ShardingSphereSchema schema = database.getSchema(schemaName);
+                if (null == schema) {
+                    throw new SchemaNotFoundException(each.getSchema());
+                }
+                schema.getAllTableNames().forEach(tableName -> result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+            } else {
+                result.computeIfAbsent(each.getSchema(), ignored -> new HashSet<>()).add(each.getTable());
+            }
+        }
+        return result;
+    }
+    
+    private String getDefaultSchema(final DatabaseType databaseType) {
+        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+            return null;
+        }
+        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+    }
+    
+    private Collection<String> getTableNamesWithoutSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Optional<SchemaTable> allTablesOptional = schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+        Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        return allTablesOptional.isPresent() ? allTableNames : schemaTables.stream().map(SchemaTable::getTable).collect(Collectors.toSet());
+    }
+    
     private List<DataNode> getActualDataNodes(final ShardingRule shardingRule, final String logicTableName) {
         TableRule tableRule = shardingRule.getTableRule(logicTableName);
         // TODO support virtual data source name
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
index 6bd48cb8efa..c382e5b439a 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
@@ -22,16 +22,21 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -41,8 +46,13 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.MockedStatic;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -91,4 +101,61 @@ public final class CDCBackendHandlerTest {
         CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new OpenGaussDatabaseType(), null, null, schemas);
+        List<SchemaTable> schemaTables = Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+                SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+        Map<String, Collection<String>> expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Arrays.asList("t_order", "t_order_item")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        Map<String, String> actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = Collections.singletonMap("", Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Collections.singletonList("t_order")));
+        expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+        assertThat(actual, is(expected));
+    }
+    
+    private ShardingSphereSchema mockSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order", mock(ShardingSphereTable.class));
+        tables.put("t_order_item", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    private Map<String, String> getSchemaTableMapWithSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+        return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema", ShardingSphereDatabase.class, List.class),
+                handler, database, schemaTables);
+    }
+    
+    @Test
+    public void assertGetSchemaTableMapWithoutSchema() throws NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("sharding_db", mockSchema());
+        ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new MySQLDatabaseType(), null, null, schemas);
+        List<SchemaTable> schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("*").build());
+        Collection<String> actualWildcardTable = getSchemaTableMapWithoutSchemaResult(database, schemaTables);
+        Set<String> expectedWildcardTable = new HashSet<>(Arrays.asList("t_order", "t_order_item"));
+        assertThat(actualWildcardTable, is(expectedWildcardTable));
+        schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        Collection<String> actualSingleTable = getSchemaTableMapWithoutSchemaResult(database, schemaTables);
+        Set<String> expectedSingleTable = new HashSet<>(Collections.singletonList("t_order"));
+        assertThat(actualSingleTable, is(expectedSingleTable));
+    }
+    
+    private Collection<String> getSchemaTableMapWithoutSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+        return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getTableNamesWithoutSchema", ShardingSphereDatabase.class, List.class),
+                handler, database, schemaTables);
+    }
 }
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 1c9a9859269..674f74b9e17 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -173,7 +173,6 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
             ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
             return;
         }
-        // TODO need support the all tables at database or schema
         if (requestBody.getSourceSchemaTablesList().isEmpty()) {
             ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
             return;