You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/08 11:40:40 UTC
[shardingsphere] branch master updated: CDC support single table (#25489)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6d3dfdfedd9 CDC support single table (#25489)
6d3dfdfedd9 is described below
commit 6d3dfdfedd9271ad9f48e663352357926f39e448
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon May 8 19:40:23 2023 +0800
CDC support single table (#25489)
* CDC support single table
* Add invalid parameter check
---
.../pipeline/cdc/handler/CDCBackendHandler.java | 32 +++++++++++----
.../pipeline/cdc/util/CDCSchemaTableUtils.java | 4 +-
.../data/pipeline/cdc/util/CDCTableRuleUtils.java | 46 ----------------------
.../frontend/netty/CDCChannelInboundHandler.java | 4 +-
4 files changed, 29 insertions(+), 57 deletions(-)
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 04d86bea04f..e5fe2db24bf 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -42,15 +42,17 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRe
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
-import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtils;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.single.rule.SingleRule;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -110,12 +112,8 @@ public final class CDCBackendHandler {
tableNames = schemaTableNames;
}
ShardingSpherePreconditions.checkState(!tableNames.isEmpty(), () -> new CDCExceptionWrapper(requestId, new NotFindStreamDataSourceTableException()));
- ShardingRule shardingRule = database.getRuleMetaData().getSingleRule(ShardingRule.class);
- Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
- // TODO need support case-insensitive later
- for (String each : tableNames) {
- actualDataNodesMap.put(each, CDCTableRuleUtils.getActualDataNodes(shardingRule, each));
- }
+ Map<String, List<DataNode>> actualDataNodesMap = buildDataNodesMap(database, tableNames);
+ ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
@@ -124,6 +122,24 @@ public final class CDCBackendHandler {
return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
+ private Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
+ Map<String, List<DataNode>> result = new HashMap<>();
+ Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
+ Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
+ if (!shardingRule.isPresent() && !singleRule.isPresent()) {
+ throw new NoAnyRuleExistsException(database.getName());
+ }
+ // TODO support virtual data source name
+ for (String each : tableNames) {
+ if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
+ result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
+ continue;
+ }
+ shardingRule.flatMap(value -> value.findTableRule(each)).ifPresent(rule -> result.put(each, rule.getActualDataNodes()));
+ }
+ return result;
+ }
+
/**
* Start streaming.
*
@@ -148,7 +164,7 @@ public final class CDCBackendHandler {
/**
* Stop streaming.
*
- * @param jobId job id
+ * @param jobId job id
* @param channelId channel id
*/
public void stopStreaming(final String jobId, final ChannelId channelId) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
index d2b82b4de0c..34373ece4eb 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -99,7 +100,8 @@ public final class CDCSchemaTableUtils {
*/
public static Collection<String> parseTableExpressionWithoutSchema(final ShardingSphereDatabase database, final List<String> tableNames) {
Optional<String> allTablesOptional = tableNames.stream().filter("*"::equals).findFirst();
- Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+ ShardingSphereSchema schema = database.getSchema(database.getName());
+ Set<String> allTableNames = null == schema ? Collections.emptySet() : new HashSet<>(schema.getAllTableNames());
return allTablesOptional.isPresent() ? allTableNames : new HashSet<>(tableNames);
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java
deleted file mode 100644
index e3360346ac4..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
-
-import java.util.List;
-
-/**
- * CDC table rule utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCTableRuleUtils {
-
- /**
- * Get actual data nodes.
- *
- * @param shardingRule sharding rule
- * @param tableName table name
- * @return data nodes
- */
- public static List<DataNode> getActualDataNodes(final ShardingRule shardingRule, final String tableName) {
- TableRule tableRule = shardingRule.getTableRule(tableName);
- // TODO support virtual data source name
- return tableRule.getActualDataNodes();
- }
-}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 00d05403f55..fa97f2059ac 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
@@ -52,7 +53,6 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -100,7 +100,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
ShardingSphereSQLException exception = wrapper.getException();
channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage()));
} else {
- channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), cause.getMessage()));
+ channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
}
if (CDCConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus()) {
channelFuture.addListener(ChannelFutureListener.CLOSE);