You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/17 05:59:50 UTC
[shardingsphere] branch master updated: CDC stream data source table name wildcard support (#24209)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0a14b167ab0 CDC stream data source table name wildcard support (#24209)
0a14b167ab0 is described below
commit 0a14b167ab066e79c2171e62692d3c748797ac59
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 17 13:59:44 2023 +0800
CDC stream data source table name wildcard support (#24209)
* CDC stream data source table name wildcard support
* Add error code
* Improve
---
.../user-manual/error-code/sql-error-code.cn.md | 1 +
.../user-manual/error-code/sql-error-code.en.md | 1 +
kernel/data-pipeline/cdc/client/pom.xml | 4 ++
.../client/example/{opengauss => }/Bootstrap.java | 5 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 +--
.../importer/connector/CDCImporterConnector.java | 6 +-
.../data/pipeline/cdc/core/job/CDCJobId.java | 6 +-
.../NotFindStreamDataSourceTableException.java | 33 +++++++++
.../backend/handler/cdc/CDCBackendHandler.java | 81 ++++++++++++++++++++--
.../backend/handler/cdc/CDCBackendHandlerTest.java | 67 ++++++++++++++++++
.../frontend/netty/CDCChannelInboundHandler.java | 1 -
11 files changed, 195 insertions(+), 18 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index ad9cf82d337..211c412b93e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -131,6 +131,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| HY000 | 18093 | Can not poll event because of binlog sync channel already closed. |
| HY000 | 18095 | Can not find consistency check job of \`%s\`. |
| HY000 | 18096 | Uncompleted consistency check job \`%s\` exists. |
+| HY000 | 18200 | Not find stream data source table. |
### DistSQL
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 7ba9c53a34a..932965e2904 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -131,6 +131,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
| HY000 | 18093 | Can not poll event because of binlog sync channel already closed. |
| HY000 | 18095 | Can not find consistency check job of \`%s\`. |
| HY000 | 18096 | Uncompleted consistency check job \`%s\` exists. |
+| HY000 | 18200 | Not find stream data source table. |
### DistSQL
diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index f549dbb815f..49f407aaf26 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -47,6 +47,10 @@
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ </dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
similarity index 92%
rename from kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
rename to kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
index 5bafbec1ecf..a6b51537e75 100644
--- a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
+package org.apache.shardingsphere.data.pipeline.cdc.client.example;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
@@ -32,7 +32,7 @@ public final class Bootstrap {
* @param args args
*/
public static void main(final String[] args) {
- ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+ ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
parameter.setAddress("127.0.0.1");
parameter.setPort(33071);
@@ -41,6 +41,7 @@ public final class Bootstrap {
parameter.setDatabase("sharding_db");
parameter.setFull(true);
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
+ // support MySQL, PostgreSQL, openGauss
parameter.setDatabaseType("openGauss");
CDCClient cdcClient = new CDCClient(parameter);
cdcClient.start();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bf7b9e97c9d..30ad2964e33 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -201,7 +201,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
CDCJobId jobId = (CDCJobId) pipelineJobId;
- String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getTableNames(), jobId.isFull());
+ String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getSchemaTableNames(), jobId.isFull());
return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
}
@@ -216,7 +216,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
return result;
}
- private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final List<String> tableNames) {
+ private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final Collection<String> tableNames) {
Map<LogicTableName, String> tableNameSchemaMap = new LinkedHashMap<>();
for (String each : tableNames) {
String[] split = each.split("\\.");
@@ -243,7 +243,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
return result;
}
- private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final List<String> logicalTableNames,
+ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection<String> schemaTableNames,
final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
@@ -251,7 +251,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Map<LogicTableName, Set<String>> shardingColumnsMap = new ShardingColumnsExtractor()
- .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), logicalTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
+ .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index d1b3e8e90bc..77ccf10446c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -38,6 +38,7 @@ import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
@@ -84,11 +85,12 @@ public final class CDCImporterConnector implements ImporterConnector {
private Thread incrementalImporterTask;
- public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final List<String> tableNames, final Comparator<DataRecord> dataRecordComparator) {
+ public CDCImporterConnector(final Channel channel, final String database, final int jobShardingCount, final Collection<String> schemaTableNames,
+ final Comparator<DataRecord> dataRecordComparator) {
this.channel = channel;
this.database = database;
this.jobShardingCount = jobShardingCount;
- tableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
+ schemaTableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
String[] split = each.split("\\.");
tableNameSchemaMap.put(split[1], split[0]);
});
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
index 0182c35a74b..1a6590b8f5c 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java
@@ -35,14 +35,14 @@ public final class CDCJobId extends AbstractPipelineJobId {
private final String databaseName;
- private final List<String> tableNames;
+ private final List<String> schemaTableNames;
private final boolean full;
- public CDCJobId(final String databaseName, final List<String> tableNames, final boolean full) {
+ public CDCJobId(final String databaseName, final List<String> schemaTableNames, final boolean full) {
super(new CDCJobType(), CURRENT_VERSION);
this.databaseName = databaseName;
- this.tableNames = tableNames;
+ this.schemaTableNames = schemaTableNames;
this.full = full;
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
new file mode 100644
index 00000000000..fadddcace75
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.exception;
+
+import org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
+/**
+ * Not find stream data source table exception.
+ */
+public final class NotFindStreamDataSourceTableException extends PipelineSQLException {
+
+ private static final long serialVersionUID = 4003436152767041454L;
+
+ public NotFindStreamDataSourceTableException() {
+ super(XOpenSQLState.GENERAL_ERROR, 200, "Not find stream data source table exception");
+ }
+}
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 8707da0276a..dbbdb7f60dc 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
@@ -41,19 +42,28 @@ import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
import java.sql.SQLException;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* CDC backend handler.
@@ -77,9 +87,19 @@ public final class CDCBackendHandler {
if (null == database) {
return CDCResponseGenerator.failed(requestId, CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", requestBody.getDatabase()));
}
- List<String> schemaTableNames = new LinkedList<>();
- for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
- schemaTableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? each.getTable() : String.join(".", each.getSchema(), each.getTable()));
+ Map<String, Collection<String>> schemaTableNameMap;
+ Collection<String> tableNames;
+ Set<String> schemaTableNames = new HashSet<>();
+ if (database.getProtocolType().isSchemaAvailable()) {
+ schemaTableNameMap = getSchemaTableMapWithSchema(database, requestBody.getSourceSchemaTablesList());
+ // TODO if different schema have same table names, table name may be overwritten, because the table name at sharding rule not contain schema.
+ tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
+ } else {
+ tableNames = getTableNamesWithoutSchema(database, requestBody.getSourceSchemaTablesList());
+ }
+ if (tableNames.isEmpty()) {
+ throw new NotFindStreamDataSourceTableException();
}
Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
if (!shardingRule.isPresent()) {
@@ -87,17 +107,66 @@ public final class CDCBackendHandler {
}
Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
// TODO need support case-insensitive later
- for (SchemaTable each : requestBody.getSourceSchemaTablesList()) {
- actualDataNodesMap.put(each.getTable(), getActualDataNodes(shardingRule.get(), each.getTable()));
+ for (String each : tableNames) {
+ actualDataNodesMap.put(each, getActualDataNodes(shardingRule.get(), each));
}
boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
- StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), schemaTableNames, requestBody.getFull(), actualDataNodesMap, decodeWithTx);
+ StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
String jobId = jobAPI.createJob(parameter);
connectionContext.setJobId(jobId);
startStreaming(requestId, jobId, connectionContext, channel);
return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
+ private Map<String, Collection<String>> getSchemaTableMapWithSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+ Map<String, Collection<String>> result = new HashMap<>();
+ Collection<String> systemSchemas = database.getProtocolType().getSystemSchemas();
+ Optional<SchemaTable> allSchemaTablesOptional = schemaTables.stream().filter(each -> "*".equals(each.getTable()) && "*".equals(each.getSchema())).findFirst();
+ if (allSchemaTablesOptional.isPresent()) {
+ for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+ entry.getValue().getAllTableNames().forEach(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+ }
+ return result;
+ }
+ for (SchemaTable each : schemaTables) {
+ if ("*".equals(each.getSchema())) {
+ for (Entry<String, ShardingSphereSchema> entry : database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+ entry.getValue().getAllTableNames().stream().filter(tableName -> tableName.equals(each.getTable())).findFirst()
+ .ifPresent(tableName -> result.computeIfAbsent(entry.getKey(), ignored -> new HashSet<>()).add(tableName));
+ }
+ } else if ("*".equals(each.getTable())) {
+ String schemaName = each.getSchema().isEmpty() ? getDefaultSchema(database.getProtocolType()) : each.getSchema();
+ ShardingSphereSchema schema = database.getSchema(schemaName);
+ if (null == schema) {
+ throw new SchemaNotFoundException(each.getSchema());
+ }
+ schema.getAllTableNames().forEach(tableName -> result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+ } else {
+ result.computeIfAbsent(each.getSchema(), ignored -> new HashSet<>()).add(each.getTable());
+ }
+ }
+ return result;
+ }
+
+ private String getDefaultSchema(final DatabaseType databaseType) {
+ if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+ return null;
+ }
+ return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+ }
+
+ private Collection<String> getTableNamesWithoutSchema(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+ Optional<SchemaTable> allTablesOptional = schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+ Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+ return allTablesOptional.isPresent() ? allTableNames : schemaTables.stream().map(SchemaTable::getTable).collect(Collectors.toSet());
+ }
+
private List<DataNode> getActualDataNodes(final ShardingRule shardingRule, final String logicTableName) {
TableRule tableRule = shardingRule.getTableRule(logicTableName);
// TODO support virtual data source name
diff --git a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
index 6bd48cb8efa..c382e5b439a 100644
--- a/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++ b/proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
@@ -22,16 +22,21 @@ import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
@@ -41,8 +46,13 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.MockedStatic;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -91,4 +101,61 @@ public final class CDCBackendHandlerTest {
CDCResponse actualResponse = handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), mock(CDCConnectionContext.class), mock(Channel.class));
assertThat(actualResponse.getStatus(), is(Status.FAILED));
}
+
+ @Test
+ public void assertGetSchemaTableMapWithSchema() throws NoSuchMethodException {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("test", mockSchema());
+ schemas.put("public", mockSchema());
+ ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new OpenGaussDatabaseType(), null, null, schemas);
+ List<SchemaTable> schemaTables = Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+ SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+ Map<String, Collection<String>> expected = new HashMap<>();
+ expected.put("test", new HashSet<>(Arrays.asList("t_order", "t_order_item")));
+ expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+ Map<String, String> actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+ assertThat(actual, is(expected));
+ schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+ actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+ expected = Collections.singletonMap("", Collections.singleton("t_order"));
+ assertThat(actual, is(expected));
+ schemaTables = Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+ actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+ expected = new HashMap<>();
+ expected.put("test", new HashSet<>(Collections.singletonList("t_order")));
+ expected.put("public", new HashSet<>(Collections.singletonList("t_order")));
+ assertThat(actual, is(expected));
+ }
+
+ private ShardingSphereSchema mockSchema() {
+ Map<String, ShardingSphereTable> tables = new HashMap<>();
+ tables.put("t_order", mock(ShardingSphereTable.class));
+ tables.put("t_order_item", mock(ShardingSphereTable.class));
+ return new ShardingSphereSchema(tables, Collections.emptyMap());
+ }
+
+ private Map<String, String> getSchemaTableMapWithSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+ return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema", ShardingSphereDatabase.class, List.class),
+ handler, database, schemaTables);
+ }
+
+ @Test
+ public void assertGetSchemaTableMapWithoutSchema() throws NoSuchMethodException {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("sharding_db", mockSchema());
+ ShardingSphereDatabase database = new ShardingSphereDatabase("sharding_db", new MySQLDatabaseType(), null, null, schemas);
+ List<SchemaTable> schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("*").build());
+ Collection<String> actualWildcardTable = getSchemaTableMapWithoutSchemaResult(database, schemaTables);
+ Set<String> expectedWildcardTable = new HashSet<>(Arrays.asList("t_order", "t_order_item"));
+ assertThat(actualWildcardTable, is(expectedWildcardTable));
+ schemaTables = Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+ Collection<String> actualSingleTable = getSchemaTableMapWithoutSchemaResult(database, schemaTables);
+ Set<String> expectedSingleTable = new HashSet<>(Collections.singletonList("t_order"));
+ assertThat(actualSingleTable, is(expectedSingleTable));
+ }
+
+ private Collection<String> getSchemaTableMapWithoutSchemaResult(final ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws NoSuchMethodException {
+ return ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getTableNamesWithoutSchema", ShardingSphereDatabase.class, List.class),
+ handler, database, schemaTables);
+ }
}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 1c9a9859269..674f74b9e17 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -173,7 +173,6 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "database not allowed to be empty"));
return;
}
- // TODO need support the all tables at database or schema
if (requestBody.getSourceSchemaTablesList().isEmpty()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal stream data request parameter"));
return;