You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/06/27 08:59:19 UTC

[shardingsphere] branch master updated: Extract CDC build data node method (#26616)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5897a62e557 Extract CDC build data node method (#26616)
5897a62e557 is described below

commit 5897a62e557d801173dcbac2c2c336bc631a03ca
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Jun 27 16:59:13 2023 +0800

    Extract CDC build data node method (#26616)
    
    * Extract CDC get data node method
    
    * Improve E2E table column definition
    
    * Revert "Improve E2E table column definition"
    
    This reverts commit a731d7b2285175acafa0e03469ba201cabb5004e.
---
 .../pipeline/cdc/handler/CDCBackendHandler.java    | 29 +---------
 .../data/pipeline/cdc/util/CDCDataNodeUtils.java   | 66 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 27 deletions(-)

diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 67d2d167209..a083e753ee4 100644
--- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -40,6 +40,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRe
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
 import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
@@ -49,18 +50,12 @@ import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseTy
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
-import org.apache.shardingsphere.single.rule.SingleRule;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -109,7 +104,7 @@ public final class CDCBackendHandler {
             tableNames = schemaTableNames;
         }
         ShardingSpherePreconditions.checkState(!tableNames.isEmpty(), () -> new CDCExceptionWrapper(requestId, new NotFindStreamDataSourceTableException()));
-        Map<String, List<DataNode>> actualDataNodesMap = buildDataNodesMap(database, tableNames);
+        Map<String, List<DataNode>> actualDataNodesMap = CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
         ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
         boolean decodeWithTx = database.getProtocolType() instanceof OpenGaussDatabaseType;
         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
@@ -119,26 +114,6 @@ public final class CDCBackendHandler {
         return CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
-    private Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
-        Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
-        Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
-        Map<String, List<DataNode>> result = new HashMap<>();
-        // TODO support virtual data source name
-        for (String each : tableNames) {
-            if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
-                result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
-                continue;
-            }
-            if (shardingRule.isPresent() && shardingRule.get().findTableRule(each).isPresent()) {
-                TableRule tableRule = shardingRule.get().getTableRule(each);
-                result.put(each, tableRule.getActualDataNodes());
-                continue;
-            }
-            throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each));
-        }
-        return result;
-    }
-    
     /**
      * Start streaming.
      *
diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
new file mode 100644
index 00000000000..adbdea6ad9a
--- /dev/null
+++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+import org.apache.shardingsphere.single.rule.SingleRule;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * CDC data node utils.
+ */
+public final class CDCDataNodeUtils {
+    
+    /**
+     * Build data nodes map.
+     *
+     * @param database database
+     * @param tableNames table names
+     * @return data nodes map
+     * @throws PipelineInvalidParameterException thrown invalid parameter exception when can't get data nodes.
+     */
+    public static Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
+        Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
+        Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
+        Map<String, List<DataNode>> result = new HashMap<>();
+        // TODO support virtual data source name
+        for (String each : tableNames) {
+            if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
+                result.put(each, new ArrayList<>(singleRule.get().getAllDataNodes().get(each)));
+                continue;
+            }
+            if (shardingRule.isPresent() && shardingRule.get().findTableRule(each).isPresent()) {
+                TableRule tableRule = shardingRule.get().getTableRule(each);
+                result.put(each, tableRule.getActualDataNodes());
+                continue;
+            }
+            throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each));
+        }
+        return result;
+    }
+}