You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:40 UTC

[rocketmq-connect] 32/43: fix(jdbc-connect) removed unused class

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 7ebfedf7e367028c5bf413415794f72c08700956
Author: 翊名 <du...@alibaba-inc.com>
AuthorDate: Mon Mar 30 12:53:24 2020 +0800

    fix(jdbc-connect) removed unused class
---
 .../rocketmq/connect/jdbc/config/Config.java       | 31 ++++++++
 .../jdbc/strategy/DivideTaskByConsistentHash.java  | 82 ----------------------
 .../jdbc/connector/JdbcSourceConnectorTest.java    |  4 +-
 3 files changed, 32 insertions(+), 85 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index ae86d3f..7b4aca3 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -78,6 +78,9 @@ public class Config {
     private long timestampDelayInterval = 0;
     private String dbTimezone = "GMT+8";
     private String queueName;
+    private String jdbcUrl;
+    private String jdbcUsername;
+    private String jdbcPassword;
 
     private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
@@ -322,4 +325,32 @@ public class Config {
     public void setWhiteTable(String whiteTable) {
         this.whiteTable = whiteTable;
     }
+
+    public void setPollInterval(long pollInterval) {
+        this.pollInterval = pollInterval;
+    }
+
+    public String getJdbcUrl() {
+        return jdbcUrl;
+    }
+
+    public void setJdbcUrl(String jdbcUrl) {
+        this.jdbcUrl = jdbcUrl;
+    }
+
+    public String getJdbcUsername() {
+        return jdbcUsername;
+    }
+
+    public void setJdbcUsername(String jdbcUsername) {
+        this.jdbcUsername = jdbcUsername;
+    }
+
+    public String getJdbcPassword() {
+        return jdbcPassword;
+    }
+
+    public void setJdbcPassword(String jdbcPassword) {
+        this.jdbcPassword = jdbcPassword;
+    }
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
deleted file mode 100644
index bac7358..0000000
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package org.apache.rocketmq.connect.jdbc.strategy;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.KeyValue;
-import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
-import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.replicator.config.DataType;
-import org.apache.rocketmq.replicator.config.TaskConfigEnum;
-import org.apache.rocketmq.replicator.config.TaskDivideConfig;
-import org.apache.rocketmq.replicator.config.TaskTopicInfo;
-
-import java.util.*;
-
-public class DivideTaskByConsistentHash extends TaskDivideStrategy {
-    @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
-
-        List<KeyValue> config = new ArrayList<>();
-        int parallelism = tdc.getTaskParallelism();
-        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
-        int id = -1;
-
-        Collection<ClientNode> cidNodes = new ArrayList<>();
-        for (int i = 0; i < parallelism; i++) {
-            cidNodes.add(new ClientNode(i, Integer.toString(i)));
-            queueTopicList.put(i, new ArrayList<>());
-        }
-
-        ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size());
-
-        for (String t : topicMap.keySet()) {
-            for (TaskTopicInfo queue : topicMap.get(t)) {
-                ClientNode clientNode = router.routeNode(queue.toString());
-                if (clientNode != null) {
-                    queueTopicList.get(clientNode.index).add(queue);
-                }
-            }
-        }
-
-        for (int i = 0; i < parallelism; i++) {
-            KeyValue keyValue = new DefaultKeyValue();
-            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
-            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
-            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
-            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
-            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
-            config.add(keyValue);
-        }
-
-        return config;
-    }
-
-    private static class ClientNode implements Node {
-        private final String clientID;
-        private final int index;
-
-        public ClientNode(int index, String clientID) {
-            this.index = index;
-            this.clientID = clientID;
-        }
-
-        @Override
-        public String getKey() {
-            return clientID;
-        }
-    }
-}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
index 1e7cf78..5d25f98 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
@@ -22,9 +22,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.rocketmq.connect.jdbc.Config;
-import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
-
+import org.apache.rocketmq.connect.jdbc.config.Config;
 import org.junit.Test;
 
 import io.openmessaging.KeyValue;