You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:37 UTC
[rocketmq-connect] 29/43: [ISSUE #489] JDBC Connector support divide task by topic strategy (#490)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 341d6f356047b0fe908a2aad1ca901cdd7317aef
Author: Xiongmengfei <Xi...@163.com>
AuthorDate: Tue Dec 31 15:21:39 2019 +0800
[ISSUE #489] JDBC Connector support divide task by topic strategy (#490)
* [ISSUE #489] JDBC Connector support divide task by topic strategy
* [ISSUE #489] JDBC Connector support divide task by topic strategy
---
.../rocketmq/connect/jdbc/config/Config.java | 70 +++++++++----
.../rocketmq/connect/jdbc/config/DataType.java | 26 +++++
.../connect/jdbc/config/DbConnectorConfig.java | 84 ++++++++++++++++
.../connect/jdbc/config/SinkDbConnectorConfig.java | 67 ++++++++++++
.../jdbc/config/SourceDbConnectorConfig.java | 73 ++++++++++++++
.../connect/jdbc/config/TaskDivideConfig.java | 112 +++++++++++++++++++++
.../connect/jdbc/config/TaskTopicInfo.java | 37 +++++++
.../connect/jdbc/connector/JdbcSinkConnector.java | 39 +++++--
.../connect/jdbc/connector/JdbcSinkTask.java | 1 +
.../jdbc/connector/JdbcSourceConnector.java | 37 +++++--
.../connect/jdbc/connector/JdbcSourceTask.java | 13 +--
.../connect/jdbc/strategy/DivideStrategyEnum.java | 23 +++++
.../jdbc/strategy/DivideTaskByConsistentHash.java | 82 +++++++++++++++
.../connect/jdbc/strategy/DivideTaskByQueue.java | 62 ++++++++++++
.../connect/jdbc/strategy/DivideTaskByTopic.java | 104 +++++++++++++++++++
.../connect/jdbc/strategy/TaskDivideStrategy.java | 32 ++++++
16 files changed, 822 insertions(+), 40 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
index 5491d43..91a3e51 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/Config.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.jdbc.config;
+import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,9 +29,11 @@ public class Config {
private static final Logger LOG = LoggerFactory.getLogger(Config.class);
/* Database Connection Config */
- private String jdbcUrl;
- private String jdbcUsername;
- private String jdbcPassword;
+ private String dbUrl;
+ private String dbPort;
+ private String dbUsername;
+ private String dbPassword;
+ private String dataType;
private String rocketmqTopic;
private String jdbcBackoff;
private String jdbcAttempts;
@@ -44,6 +47,18 @@ public class Config {
private String whiteDataBase;
private String whiteTable;
+ public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+ public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+ public static final String CONN_WHITE_LIST = "whiteDataBase";
+ public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+ public static final String CONN_DB_IP = "dbUrl";
+ public static final String CONN_DB_PORT = "dbPort";
+ public static final String CONN_DB_USERNAME = "dbUsername";
+ public static final String CONN_DB_PASSWORD = "dbPassword";
+ public static final String CONN_DATA_TYPE = "dataType";
+ public static final String CONN_TOPIC_NAMES = "topicNames";
+ public static final String CONN_DB_MODE = "mode";
+
/* Mode Config */
private String mode = "";
private String incrementingColumnName = "";
@@ -57,15 +72,16 @@ public class Config {
private int batchMaxRows = 100;
private long tablePollInterval = 60000;
private long timestampDelayInterval = 0;
- private String dbTimezone = "UTC";
+ private String dbTimezone = "GMT+8";
private String queueName;
private Logger log = LoggerFactory.getLogger(Config.class);
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
- add("jdbcUrl");
- add("jdbcUsername");
- add("jdbcPassword");
+ add("dbUrl");
+ add("dbPort");
+ add("dbUsername");
+ add("dbPassword");
add("mode");
add("rocketmqTopic");
}
@@ -79,28 +95,44 @@ public class Config {
this.queueName = queueName;
}
- public String getJdbcUrl() {
- return jdbcUrl;
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUsername() {
+ return dbUsername;
}
- public void setJdbcUrl(String jdbcUrl) {
- this.jdbcUrl = jdbcUrl;
+ public void setDbUsername(String dbUsername) {
+ this.dbUsername = dbUsername;
}
- public String getJdbcUsername() {
- return jdbcUsername;
+ public String getDbPassword() {
+ return dbPassword;
}
- public void setJdbcUsername(String jdbcUsername) {
- this.jdbcUsername = jdbcUsername;
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
}
- public String getJdbcPassword() {
- return jdbcPassword;
+ public String getDataType() {
+ return dataType;
}
- public void setJdbcPassword(String jdbcPassword) {
- this.jdbcPassword = jdbcPassword;
+ public void setDataType(String dataType) {
+ this.dataType = dataType;
}
public String getRocketmqTopic() {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java
new file mode 100644
index 0000000..ef7408a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+public enum DataType {
+
+ COMMON_MESSAGE,
+ TOPIC_CONFIG,
+ BROKER_CONFIG,
+ SUB_CONFIG,
+ OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java
new file mode 100644
index 0000000..43bd165
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/DbConnectorConfig.java
@@ -0,0 +1,84 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+
+ public TaskDivideStrategy taskDivideStrategy;
+ public String dbUrl;
+ public String dbPort;
+ public String dbUserName;
+ public String dbPassword;
+ public String converter;
+ public int taskParallelism;
+ public String mode;
+
+ public abstract void validate(KeyValue config);
+
+ public abstract <T> T getWhiteTopics();
+
+ public TaskDivideStrategy getTaskDivideStrategy() {
+ return taskDivideStrategy;
+ }
+
+ public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+ this.taskDivideStrategy = taskDivideStrategy;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getConverter() {
+ return converter;
+ }
+
+ public void setConverter(String converter) {
+ this.converter = converter;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
new file mode 100644
index 0000000..851b253
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SinkDbConnectorConfig.java
@@ -0,0 +1,67 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class SinkDbConnectorConfig extends DbConnectorConfig{
+
+ private Set<String> whiteList;
+
+ public SinkDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+ if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ this.taskDivideStrategy = new DivideTaskByQueue();
+ } else {
+ this.taskDivideStrategy = new DivideTaskByTopic();
+ }
+
+ buildWhiteList(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+
+ }
+
+ private void buildWhiteList(KeyValue config) {
+ this.whiteList = new HashSet<>();
+ String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+ String[] wl = whiteListStr.trim().split(",");
+ if (wl.length <= 0)
+ throw new IllegalArgumentException("White list must be not empty.");
+ else {
+ this.whiteList.clear();
+ for (String t : wl) {
+ this.whiteList.add(t.trim());
+ }
+ }
+ }
+
+
+ public Set<String> getWhiteList() {
+ return whiteList;
+ }
+
+ public void setWhiteList(Set<String> whiteList) {
+ this.whiteList = whiteList;
+ }
+
+ @Override
+ public Set<String> getWhiteTopics() {
+ return getWhiteList();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
new file mode 100644
index 0000000..801e411
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/SourceDbConnectorConfig.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.connect.jdbc.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connect.jdbc.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+ private Map<String, String> whiteMap;
+
+ public SourceDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 0);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+ if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ this.taskDivideStrategy = new DivideTaskByQueue();
+ } else {
+ this.taskDivideStrategy = new DivideTaskByTopic();
+ }
+
+ buildWhiteMap(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+ }
+
+ private void buildWhiteMap(KeyValue config) {
+ this.whiteMap = new HashMap<>(16);
+ String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+ JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+ if(whiteDataBaseObject.keySet().size() <= 0){
+ throw new IllegalArgumentException("white data base must be not empty.");
+ }else {
+ this.whiteMap.clear();
+ for (String dbName : whiteDataBaseObject.keySet()){
+ JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+ for (String tableName : whiteTableObject.keySet()){
+ String dbTableKey = dbName + "-" + tableName;
+ this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+ }
+ }
+ }
+ }
+
+
+ public Map<String, String> getWhiteMap() {
+ return whiteMap;
+ }
+
+ public void setWhiteMap(Map<String, String> whiteMap) {
+ this.whiteMap = whiteMap;
+ }
+
+ @Override
+ public Map<String, String> getWhiteTopics() {
+ return getWhiteMap();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java
new file mode 100644
index 0000000..8b15a2f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskDivideConfig.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+public class TaskDivideConfig {
+
+ private String dbUrl;
+
+ private String dbPort;
+
+ private String dbUserName;
+
+ private String dbPassword;
+
+ private String srcRecordConverter;
+
+ private int dataType;
+
+ private int taskParallelism;
+
+ private String mode;
+
+ public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String srcRecordConverter,
+ int dataType, int taskParallelism, String mode) {
+ this.dbUrl = dbUrl;
+ this.dbPort = dbPort;
+ this.dbUserName = dbUserName;
+ this.dbPassword = dbPassword;
+ this.srcRecordConverter = srcRecordConverter;
+ this.dataType = dataType;
+ this.taskParallelism = taskParallelism;
+ this.mode = mode;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getSrcRecordConverter() {
+ return srcRecordConverter;
+ }
+
+ public void setSrcRecordConverter(String srcRecordConverter) {
+ this.srcRecordConverter = srcRecordConverter;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java
new file mode 100644
index 0000000..5c2a21e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/config/TaskTopicInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+ private String targetTopic;
+
+ public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+ super(sourceTopic, brokerName, queueId);
+ this.targetTopic = targetTopic;
+ }
+
+ public String getTargetTopic() {
+ return this.targetTopic;
+ }
+
+ public void setTargetTopic(String targetTopic) {
+ this.targetTopic = targetTopic;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
index e1d1256..935ad52 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java
@@ -3,15 +3,23 @@ package org.apache.rocketmq.connect.jdbc.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.sink.SinkConnector;
-import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class JdbcSinkConnector extends SinkConnector {
-
+ private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);
private KeyValue config;
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+
+ public JdbcSinkConnector(){
+ dbConnectorConfig = new SinkDbConnectorConfig();
+ }
@Override
public String verifyAndSetConfig(KeyValue config) {
@@ -20,7 +28,13 @@ public class JdbcSinkConnector extends SinkConnector {
return "Request config key: " + requestKey;
}
}
- this.config = config;
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
+
return "";
}
@@ -51,8 +65,21 @@ public class JdbcSinkConnector extends SinkConnector {
@Override
public List<KeyValue> taskConfigs() {
- List<KeyValue> config = new ArrayList<>();
- config.add(this.config);
- return config;
+ log.info("List.start");
+ if (!configValid) {
+ return new ArrayList<KeyValue>();
+ }
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
}
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
index 4b55e5a..31f43e3 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkTask.java
@@ -114,6 +114,7 @@ public class JdbcSinkTask extends SinkTask {
try {
if (connection != null){
connection.close();
+ log.info("jdbc sink task connection is closed.");
}
} catch (Throwable e) {
log.warn("sink task stop error while closing connection to {}", "jdbc", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 796f0e6..a083e84 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.connect.jdbc.config.Config;
+import org.apache.rocketmq.connect.jdbc.config.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,19 +30,29 @@ import io.openmessaging.connector.api.source.SourceConnector;
public class JdbcSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceConnector.class);
- private KeyValue config;
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+
+ public JdbcSourceConnector() {
+ dbConnectorConfig = new SourceDbConnectorConfig();
+ }
@Override
public String verifyAndSetConfig(KeyValue config) {
- log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
+ log.info("JdbcSourceConnector verifyAndSetConfig enter");
for (String requestKey : Config.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
return "Request config key: " + requestKey;
}
}
- this.config = config;
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
return "";
}
@@ -75,8 +85,21 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public List<KeyValue> taskConfigs() {
log.info("List.start");
- List<KeyValue> config = new ArrayList<>();
- config.add(this.config);
- return config;
+ if (!configValid) {
+ return new ArrayList<KeyValue>();
+ }
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
}
+
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 943d432..d533395 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -22,11 +22,7 @@ import io.openmessaging.connector.api.source.SourceTask;
import java.nio.ByteBuffer;
import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -82,7 +78,7 @@ public class JdbcSourceTask extends SourceTask {
querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
else
querier = tableQueue.peek();
- Timer timer = new java.util.Timer();
+ Timer timer = new Timer();
try {
Thread.currentThread();
Thread.sleep(1000);//毫秒
@@ -114,7 +110,7 @@ public class JdbcSourceTask extends SourceTask {
}
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(config.getJdbcUrl().getBytes("UTF-8")),
+ ByteBuffer.wrap((config.getDbUrl() + config.getDbPort()).getBytes("UTF-8")),
ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
res.add(sourceDataEntry);
log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
@@ -163,8 +159,9 @@ public class JdbcSourceTask extends SourceTask {
@Override
public void stop() {
try {
- if (connection != null){
+ if (connection != null) {
connection.close();
+ log.info("jdbc source task connection is closed.");
}
} catch (Throwable e) {
log.warn("source task stop error while closing connection to {}", "jdbc", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..0afa470
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+public enum DivideStrategyEnum {
+
+ BY_TOPIC,
+ BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
new file mode 100644
index 0000000..bac7358
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByConsistentHash.java
@@ -0,0 +1,82 @@
+package org.apache.rocketmq.connect.jdbc.strategy;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+
+import java.util.*;
+
+public class DivideTaskByConsistentHash extends TaskDivideStrategy {
+ @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
+
+ List<KeyValue> config = new ArrayList<>();
+ int parallelism = tdc.getTaskParallelism();
+ Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
+ int id = -1;
+
+ Collection<ClientNode> cidNodes = new ArrayList<>();
+ for (int i = 0; i < parallelism; i++) {
+ cidNodes.add(new ClientNode(i, Integer.toString(i)));
+ queueTopicList.put(i, new ArrayList<>());
+ }
+
+ ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size());
+
+ for (String t : topicMap.keySet()) {
+ for (TaskTopicInfo queue : topicMap.get(t)) {
+ ClientNode clientNode = router.routeNode(queue.toString());
+ if (clientNode != null) {
+ queueTopicList.get(clientNode.index).add(queue);
+ }
+ }
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+ keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+
+ private static class ClientNode implements Node {
+ private final String clientID;
+ private final int index;
+
+ public ClientNode(int index, String clientID) {
+ this.index = index;
+ this.clientID = clientID;
+ }
+
+ @Override
+ public String getKey() {
+ return clientID;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
new file mode 100644
index 0000000..7ef5c31
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByQueue.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByQueue extends TaskDivideStrategy {
+ @Override
+ public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+
+ List<KeyValue> config = new ArrayList<KeyValue>();
+ int parallelism = tdc.getTaskParallelism();
+ Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+ int id = -1;
+ for (String t : topicRouteMap.keySet()) {
+ for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
+ int ind = ++id % parallelism;
+ if (!queueTopicList.containsKey(ind)) {
+ queueTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+ }
+ queueTopicList.get(ind).add(taskTopicInfo);
+ }
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+ keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..762c7a0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/DivideTaskByTopic.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.jdbc.config.*;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+
+import java.util.*;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+ @Override
+ public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ if (dbConnectorConfig instanceof SourceDbConnectorConfig){
+ return divideSourceTaskByTopic(dbConnectorConfig, tdc);
+ }else {
+ return divideSinkTaskByTopic(dbConnectorConfig, tdc);
+ }
+
+ }
+
+ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ List<KeyValue> config = new ArrayList<KeyValue>();
+ int parallelism = tdc.getTaskParallelism();
+ int id = -1;
+ Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+ Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+ for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
+ int ind = ++id % parallelism;
+ if (!taskTopicList.containsKey(ind)) {
+ taskTopicList.put(ind, new HashMap<>());
+ }
+ String dbKey = entry.getKey().split("-")[0];
+ String tableKey = entry.getKey().split("-")[1];
+ String filter = entry.getValue();
+ Map<String, String> tableMap = new HashMap<>();
+ tableMap.put(tableKey, filter);
+ taskTopicList.get(ind).put(dbKey, tableMap);
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+
+ keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+ keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+ keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+ keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+ keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+ keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+ keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+
+ private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ List<KeyValue> config = new ArrayList<KeyValue>();
+ int parallelism = tdc.getTaskParallelism();
+ int id = -1;
+ Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+ Map<Integer, String> taskTopicList = new HashMap<>();
+ for (String topicName : topicRouteSet) {
+ int ind = ++id % parallelism;
+ if (!taskTopicList.containsKey(ind)) {
+ taskTopicList.put(ind, topicName);
+ }
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+ keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+ keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+ keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+ keyValue.put(Config.CONN_TOPIC_NAMES, JSONObject.toJSONString(taskTopicList.get(i)));
+ keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+ keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..736fcac
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/strategy/TaskDivideStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import org.apache.rocketmq.connect.jdbc.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.jdbc.config.TaskTopicInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+
+ public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc);
+
+}