You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/08 11:40:40 UTC

[shardingsphere] branch master updated: CDC support single table (#25489)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d3dfdfedd9 CDC support single table (#25489)
6d3dfdfedd9 is described below

commit 6d3dfdfedd9271ad9f48e663352357926f39e448
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon May 8 19:40:23 2023 +0800

    CDC support single table (#25489)
    
    * CDC support single table
    
    * Add invalid parameter check
---
 .../pipeline/cdc/handler/CDCBackendHandler.java    | 32 +++++++++++----
 .../pipeline/cdc/util/CDCSchemaTableUtils.java     |  4 +-
 .../data/pipeline/cdc/util/CDCTableRuleUtils.java  | 46 ----------------------
 .../frontend/netty/CDCChannelInboundHandler.java   |  4 +-
 4 files changed, 29 insertions(+), 57 deletions(-)

diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 04d86bea04f..e5fe2db24bf 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -42,15 +42,17 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRe
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
-import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtils;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.single.rule.SingleRule;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -110,12 +112,8 @@ public final class CDCBackendHandler {
             tableNames = schemaTableNames;
         }
         ShardingSpherePreconditions.checkState(!tableNames.isEmpty(), () -> new CDCExceptionWrapper(requestId, new NotFindStreamDataSourceTableException()));
-        ShardingRule shardingRule = database.getRuleMetaData().getSingleRule(ShardingRule.class);
-        Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
-        // TODO need support case-insensitive later
-        for (String each : tableNames) {
-            actualDataNodesMap.put(each, CDCTableRuleUtils.getActualDataNodes(shardingRule, each));
-        }
+        Map<String, List<DataNode>> actualDataNodesMap = buildDataNodesMap(database, tableNames);
+        ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
         boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
         String jobId = jobAPI.createJob(parameter, CDCSinkType.SOCKET, new Properties());
@@ -124,6 +122,24 @@ public final class CDCBackendHandler {
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
+    private Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
+        Map<String, List<DataNode>> result = new HashMap<>();
+        Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
+        Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
+        if (!shardingRule.isPresent() && !singleRule.isPresent()) {
+            throw new NoAnyRuleExistsException(database.getName());
+        }
+        // TODO support virtual data source name
+        for (String each : tableNames) {
+            if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
+                result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
+                continue;
+            }
+            shardingRule.flatMap(value -> value.findTableRule(each)).ifPresent(rule -> result.put(each, rule.getActualDataNodes()));
+        }
+        return result;
+    }
+    
     /**
      * Start streaming.
      *
@@ -148,7 +164,7 @@ public final class CDCBackendHandler {
     /**
      * Stop streaming.
      *
-     * @param jobId     job id
+     * @param jobId job id
      * @param channelId channel id
      */
     public void stopStreaming(final String jobId, final ChannelId channelId) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
index d2b82b4de0c..34373ece4eb 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
@@ -28,6 +28,7 @@ import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -99,7 +100,8 @@ public final class CDCSchemaTableUtils {
      */
     public static Collection<String> parseTableExpressionWithoutSchema(final ShardingSphereDatabase database, final List<String> tableNames) {
         Optional<String> allTablesOptional = tableNames.stream().filter("*"::equals).findFirst();
-        Set<String> allTableNames = new HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        ShardingSphereSchema schema = database.getSchema(database.getName());
+        Set<String> allTableNames = null == schema ? Collections.emptySet() : new HashSet<>(schema.getAllTableNames());
         return allTablesOptional.isPresent() ? allTableNames : new HashSet<>(tableNames);
     }
 }
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java
deleted file mode 100644
index e3360346ac4..00000000000
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.util;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
-
-import java.util.List;
-
-/**
- * CDC table rule utility class.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class CDCTableRuleUtils {
-    
-    /**
-     * Get actual data nodes.
-     *
-     * @param shardingRule sharding rule
-     * @param tableName table name
-     * @return data nodes
-     */
-    public static List<DataNode> getActualDataNodes(final ShardingRule shardingRule, final String tableName) {
-        TableRule tableRule = shardingRule.getTableRule(tableName);
-        // TODO support virtual data source name
-        return tableRule.getActualDataNodes();
-    }
-}
diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 00d05403f55..fa97f2059ac 100644
--- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CommitStreamingRequestBody;
@@ -52,7 +53,6 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePrecondition
 import org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
 import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
-import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -100,7 +100,7 @@ public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter
             ShardingSphereSQLException exception = wrapper.getException();
             channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed(wrapper.getRequestId(), exception.toSQLException().getSQLState(), exception.getMessage()));
         } else {
-            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), cause.getMessage()));
+            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
         }
         if (CDCConnectionStatus.NOT_LOGGED_IN == connectionContext.getStatus()) {
             channelFuture.addListener(ChannelFutureListener.CLOSE);