You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:19:16 UTC

[rocketmq-connect] branch master updated (0669f46 -> 386adcc)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git.


    from 0669f46  Add 'connector/rocketmq-connect-activemq/' from commit 'e8aedf31c7eb4b26a2d9df7c766427fdfd845e9f'
     new e2cc843  [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
     new fafa276  Add 'connector/rocketmq-connect-cassandra/' from commit 'e2cc843ef4926a98797ca76880579941d5363fc6'
     new 44bb9bd  [ISSUE #801]Rocketmq connector sink for hudi (#800)
     new 5da4b78  fix hudi connect config
     new 386adcc  Add 'connector/rocketmq-connect-hudi/' from commit '5da4b78705108ac6d260283cd38f9be08d2590b9'

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 connector/rocketmq-connect-cassandra/README.md     |  97 +++++++
 .../pom.xml                                        | 123 +++++++--
 .../rocketmq-connect-cassandra/scripts/gen_data.py | 155 +++++++++++
 .../scripts/requirements.txt                       |   2 +
 .../connect/cassandra/common/CloneUtils.java       |  44 ++++
 .../connect/cassandra/common/ConstDefine.java      |  17 +-
 .../rocketmq/connect/cassandra/common/DBUtils.java |  91 +++++++
 .../connect/cassandra/common/DataType.java         |  20 +-
 .../rocketmq/connect/cassandra/common/Utils.java   |  76 ++++++
 .../rocketmq/connect/cassandra/config/Config.java  | 282 +++++++++++++++++++++
 .../connect/cassandra/config/ConfigUtil.java       |  24 +-
 .../cassandra/config/DbConnectorConfig.java        | 110 ++++++++
 .../cassandra/config/SinkDbConnectorConfig.java    | 112 ++++++++
 .../cassandra/config/SourceDbConnectorConfig.java  |  87 +++++++
 .../connect/cassandra/config/TaskDivideConfig.java | 123 +++++++++
 .../connect/cassandra/config/TaskTopicInfo.java    |  84 +++---
 .../connector/CassandraSinkConnector.java          | 240 ++++++++++++++++++
 .../cassandra/connector/CassandraSinkTask.java     | 161 ++++++++++++
 .../connector/CassandraSourceConnector.java        | 108 ++++++++
 .../cassandra/connector/CassandraSourceTask.java   | 168 ++++++++++++
 .../connect/cassandra/schema/Database.java         | 140 ++++++++++
 .../rocketmq/connect/cassandra/schema/Schema.java  | 146 +++++++++++
 .../rocketmq/connect/cassandra/schema/Table.java   | 103 ++++++++
 .../schema/column/BigIntColumnParser.java          |  43 ++--
 .../schema/column/BooleanColumnParser.java         |  33 +--
 .../cassandra/schema/column/ColumnParser.java      | 118 +++++++++
 .../schema/column/DateTimeColumnParser.java        |  53 ++++
 .../schema/column/DefaultColumnParser.java         |  32 +--
 .../cassandra/schema/column/EnumColumnParser.java  |  43 ++--
 .../cassandra/schema/column/IntColumnParser.java   |  66 +++++
 .../cassandra/schema/column/SetColumnParser.java   |  54 ++++
 .../schema/column/StringColumnParser.java          |  57 +++++
 .../cassandra/schema/column/TimeColumnParser.java  |  32 +--
 .../cassandra/schema/column/YearColumnParser.java  |  35 +--
 .../rocketmq/connect/cassandra/sink/Updater.java   | 216 ++++++++++++++++
 .../rocketmq/connect/cassandra/source/Querier.java | 164 ++++++++++++
 .../cassandra/strategy/DivideStrategyEnum.java     |  17 +-
 .../cassandra/strategy/DivideTaskByTopic.java      | 110 ++++++++
 .../cassandra/strategy/TaskDivideStrategy.java     |  33 +--
 connector/rocketmq-connect-hudi/README.md          |  77 ++++++
 .../rocketmq-connect-hudi}/pom.xml                 | 228 ++++++++++-------
 .../rocketmq/connect/hudi/config/CloneUtils.java   |  50 ++++
 .../rocketmq/connect/hudi/config/ConfigUtil.java   |  24 +-
 .../connect/hudi/config/HudiConnectConfig.java     | 173 +++++++++++++
 .../connect/hudi/config/SinkConnectConfig.java     | 139 ++++++++++
 .../apache/rocketmq/connect/hudi/config/Utils.java |  75 ++++++
 .../connect/hudi/connector/HudiSinkConnector.java  | 250 ++++++++++++++++++
 .../connect/hudi/connector/HudiSinkTask.java       | 111 ++++++++
 .../apache/rocketmq/connect/hudi/sink/Updater.java | 239 +++++++++++++++++
 .../connect/hudi/strategy/ITaskDivideStrategy.java |  28 +-
 .../hudi/strategy/TaskDivideByQueueStrategy.java   |  80 ++++++
 .../hudi/strategy/TaskDivideStrategyFactory.java   |   9 +-
 .../style}/rmq_checkstyle.xml                      |   6 -
 53 files changed, 4713 insertions(+), 395 deletions(-)
 create mode 100644 connector/rocketmq-connect-cassandra/README.md
 copy connector/{rocketmq-connect-activemq => rocketmq-connect-cassandra}/pom.xml (64%)
 create mode 100644 connector/rocketmq-connect-cassandra/scripts/gen_data.py
 create mode 100644 connector/rocketmq-connect-cassandra/scripts/requirements.txt
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileConstants.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java (66%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileConstants.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java (66%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileUtils.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java (79%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
 copy rocketmq-connect-cli/src/main/java/org/apache/rocketmq/connect/cli/commom/Config.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java (60%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java (54%)
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java (67%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java (67%)
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java (59%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java (67%)
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java (63%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileConstants.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java (66%)
 create mode 100644 connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java (66%)
 create mode 100644 connector/rocketmq-connect-hudi/README.md
 copy {rocketmq-connect-runtime => connector/rocketmq-connect-hudi}/pom.xml (55%)
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileUtils.java => connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java (79%)
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
 copy rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestTask.java => connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java (68%)
 create mode 100644 connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
 copy rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerState.java => connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java (80%)
 copy {style => connector/rocketmq-connect-hudi/style}/rmq_checkstyle.xml (95%)

[rocketmq-connect] 01/05: [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e2cc843ef4926a98797ca76880579941d5363fc6
Author: affe <af...@gmail.com>
AuthorDate: Tue Jul 28 11:41:32 2020 +0800

    [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
    
    * feat(connect-cassanra) merge affe's cassandra connector implementation
    
    * Merge master into current branch : deleted .iml in connect-jms
    
    * style(connect-cassandra): resolve all TODOs and meaningless comments
---
 README.md                                          |  97 +++++++
 pom.xml                                            | 276 ++++++++++++++++++++
 scripts/gen_data.py                                | 155 +++++++++++
 scripts/requirements.txt                           |   2 +
 .../connect/cassandra/common/CloneUtils.java       |  44 ++++
 .../connect/cassandra/common/ConstDefine.java      |  23 ++
 .../rocketmq/connect/cassandra/common/DBUtils.java |  91 +++++++
 .../connect/cassandra/common/DataType.java         |  26 ++
 .../rocketmq/connect/cassandra/common/Utils.java   |  76 ++++++
 .../rocketmq/connect/cassandra/config/Config.java  | 282 +++++++++++++++++++++
 .../connect/cassandra/config/ConfigUtil.java       |  70 +++++
 .../cassandra/config/DbConnectorConfig.java        | 110 ++++++++
 .../cassandra/config/SinkDbConnectorConfig.java    | 112 ++++++++
 .../cassandra/config/SourceDbConnectorConfig.java  |  87 +++++++
 .../connect/cassandra/config/TaskDivideConfig.java | 123 +++++++++
 .../connect/cassandra/config/TaskTopicInfo.java    |  40 +++
 .../connector/CassandraSinkConnector.java          | 240 ++++++++++++++++++
 .../cassandra/connector/CassandraSinkTask.java     | 161 ++++++++++++
 .../connector/CassandraSourceConnector.java        | 108 ++++++++
 .../cassandra/connector/CassandraSourceTask.java   | 168 ++++++++++++
 .../connect/cassandra/schema/Database.java         | 140 ++++++++++
 .../rocketmq/connect/cassandra/schema/Schema.java  | 146 +++++++++++
 .../rocketmq/connect/cassandra/schema/Table.java   | 103 ++++++++
 .../schema/column/BigIntColumnParser.java          |  50 ++++
 .../schema/column/BooleanColumnParser.java         |  34 +++
 .../cassandra/schema/column/ColumnParser.java      | 118 +++++++++
 .../schema/column/DateTimeColumnParser.java        |  53 ++++
 .../schema/column/DefaultColumnParser.java         |  37 +++
 .../cassandra/schema/column/EnumColumnParser.java  |  46 ++++
 .../cassandra/schema/column/IntColumnParser.java   |  66 +++++
 .../cassandra/schema/column/SetColumnParser.java   |  54 ++++
 .../schema/column/StringColumnParser.java          |  57 +++++
 .../cassandra/schema/column/TimeColumnParser.java  |  39 +++
 .../cassandra/schema/column/YearColumnParser.java  |  40 +++
 .../rocketmq/connect/cassandra/sink/Updater.java   | 216 ++++++++++++++++
 .../rocketmq/connect/cassandra/source/Querier.java | 164 ++++++++++++
 .../cassandra/strategy/DivideStrategyEnum.java     |  23 ++
 .../cassandra/strategy/DivideTaskByTopic.java      | 110 ++++++++
 .../cassandra/strategy/TaskDivideStrategy.java     |  32 +++
 39 files changed, 3819 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6439269
--- /dev/null
+++ b/README.md
@@ -0,0 +1,97 @@
+# rocketmq-connect-cassandra
+
+## rocketmq-connect-cassandra 打包
+```
+mvn clean install -DskipTest -U 
+```
+
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取
+位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件[reference.conf](https://github.com/datastax/java-driver/blob/4.5.0/core/src/main/resources/reference.conf) 并将其放置于classpath中。
+
+该问题还仍然在解决的过程中。
+
+## rocketmq-connect-cassandra 启动
+
+* **cassandra-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+
+* **cassandra-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+>**注:** `rocketmq-cassandra-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-cassandra 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop
+```
+
+## rocketmq-connect-cassandra 参数说明
+* **cassandra-source-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | source端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | source端 DB port | 3306 |
+|dbUsername | String | 是 | source端 DB 用户名 | root |
+|dbPassword | String | 是 | source端 DB 密码 | 123456 |
+|rocketmqTopic | String | 是 | 待废弃的参数,需和topicNames相同 | jdbc_cassandra |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_cassandra |
+|whiteDataBase | String | 是 | source端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意 | {"DATABASE_TEST":{"TEST_DATA":{"name":"test"}}} |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|localDataCenter | String | 是 | 待废弃 | cassandra 集群的datacenter名称,为必填项 |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
+
+
+示例配置如下
+```js
+{
+    "connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
+    "rocketmqTopic":"jdbc_cassandra",
+    "topicNames": "jdbc_cassandra",
+    "dbUrl":"127.0.0.1",
+    "dbPort":"9042",
+    "dbUsername":"cassandra",
+    "dbPassword":"cassandra",
+    "localDataCenter":"datacenter1",
+    "whiteDataBase": {
+        "jdbc":{
+           "jdbc_cassandra": {"NO-FILTER": "10"}
+         }
+      },
+    "mode": "bulk",
+    "task-parallelism": 1,
+    "source-cluster": "172.17.0.1:10911",
+    "source-rocketmq": "127.0.0.1:9876",
+    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+ }
+```
+* **cassandra-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | sink端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | sink端 DB port | 3306 |
+|dbUsername | String | 是 | sink端 DB 用户名 | root |
+|dbPassword | String | 是 | sink端 DB 密码 | 123456 |
+|topicNames | String | 是 | sink端同步数据的topic名字 | topic-1,topic-2 |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|~~rocketmqTopic~~ | String | 是 | 待废弃 | cassandraTopic |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..286a6ef
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,276 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-cassandra</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-cassandra</name>
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.5.2</rocketmq.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.cassandra.connector.CassandraSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>0.3.1-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.60</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.javalin</groupId>
+            <artifactId>javalin</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.31</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-core-shaded</artifactId>
+            <version>4.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-query-builder</artifactId>
+            <version>4.5.1</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/scripts/gen_data.py b/scripts/gen_data.py
new file mode 100644
index 0000000..8fde504
--- /dev/null
+++ b/scripts/gen_data.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import *
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, String
+from sqlalchemy.orm import sessionmaker
+import argparse
+import random
+import string
+import datetime
+import time
+import uuid
+
+
+
+Base = declarative_base()
+class JdbcCassandra(Base):
+    __tablename__ = 'jdbc_cassandra'
+    int_type = Column(INT, primary_key=True, autoincrement=True)
+    ascii_type = Column(VARCHAR(50))
+    boolean_type = Column(BOOLEAN)
+    date_type = Column(DATE)
+    decimal_type = Column(DECIMAL)
+    double_type = Column(FLOAT)
+    float_type = Column(FLOAT)
+    inet_type = Column(VARCHAR(50))
+    smallint_type = Column(SMALLINT)
+    time_type  = Column(TIME)
+    text_type = Column(TEXT)
+    timestamp_type  = Column(TIMESTAMP)
+    #timeuuid_type  = Column(CHAR(36))
+    tinyint_type  = Column(SMALLINT)
+    #uuid_type  = Column(CHAR(36))
+    varchar_type  = Column(VARCHAR(50))
+    varint_type  = Column(BIGINT)
+
+
+
+def random_string(strlen=8):
+    letters = string.ascii_lowercase
+    return bytes(''.join(random.choice(letters) for i in range(strlen)), 'utf8') 
+
+
+def str_time_prop(start, end, format, prop):
+    """Get a time at a proportion of a range of two formatted times.
+
+    start and end should be strings specifying times formated in the
+    given format (strftime-style), giving an interval [start, end].
+    prop specifies how a proportion of the interval to be taken after
+    start.  The returned time will be in the specified format.
+    """
+
+    stime = time.mktime(time.strptime(start, format))
+    etime = time.mktime(time.strptime(end, format))
+
+    ptime = stime + prop * (etime - stime)
+
+    return time.strftime(format, time.localtime(ptime))
+
+
+def random_date(start, end, prop):
+    return bytes(str_time_prop(start, end, '%m/%d/%Y %I:%M %p', prop), 'utf8')
+
+def random_time(start, end, prop):
+    return bytes(str_time_prop(start, end, '%Y-%m-%d %H:%M:%S', prop), 'utf8')
+
+def format_time():
+    t = datetime.datetime.now()
+    s = t.strftime('%Y-%m-%d %H:%M:%S.%f')
+    return s[:-3]
+
+def main():
+    
+    # define parser
+    parser = argparse.ArgumentParser()
+    parser.add_argument('hostname', metavar='HOSTNAME', type=str,
+                        help='hostname of target mysql database ')
+    parser.add_argument('port', metavar='PORT', type=str,
+                        help='port of target mysql database')
+    parser.add_argument('username', metavar='USERNAME', type=str,
+                        help='username of the user connecting to mysql')
+    parser.add_argument('password', metavar='PASSWORD', type=str,
+                        help='password of specified user')   
+    parser.add_argument('database', metavar='DATABASE', type=str,
+                        help='which database to connect to') 
+    parser.add_argument('count', metavar='COUNT', type=int,
+                        help='how many random records to insert into the database')                                                           
+    args = parser.parse_args()
+
+
+    # get variabless from command line
+    hostname = args.hostname
+    port = args.port
+    username = args.username
+    password = args.password
+    database = args.database
+    count = args.count
+
+
+    # create db connection
+    # print("----------------ERROR-------------")
+    # print("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database))
+    engine = create_engine("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database), echo=True)
+
+
+    # create table if not exist
+    Base.metadata.create_all(engine)    
+
+    # create a session
+    Session = sessionmaker(bind=engine)
+    session = Session()
+
+    for i in range(0, count):
+        random_record = JdbcCassandra(
+            ascii_type = random_string(30),
+            boolean_type = (i % 2 == 0),
+            date_type = datetime.datetime.now().date(),
+            decimal_type = 10.5,
+            double_type = 1.5,
+            float_type = 8.3,
+            inet_type = "127.0.0.1",
+            smallint_type = 1,
+            time_type = datetime.datetime.now().time(),
+            text_type = random_string(30),
+            timestamp_type = datetime.datetime.now(),
+            #timeuuid_type = str(uuid.uuid1()),
+            tinyint_type = 1,
+            #uuid_type = str(uuid.uuid1()),
+            varchar_type = random_string(30),
+            varint_type = random.getrandbits(63),
+        )
+        session.add(random_record)
+
+    session.commit()
+
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file
diff --git a/scripts/requirements.txt b/scripts/requirements.txt
new file mode 100644
index 0000000..fc7dc31
--- /dev/null
+++ b/scripts/requirements.txt
@@ -0,0 +1,2 @@
+SQLAlchemy==1.3.16
+PyMySQL==0.9.3
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
new file mode 100644
index 0000000..c860750
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+    @SuppressWarnings("unchecked")
+    public static <T extends Serializable> T clone(T obj) {
+        T clonedObj = null;
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(obj);
+            oos.close();
+
+            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            clonedObj = (T) ois.readObject();
+            ois.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return clonedObj;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
new file mode 100644
index 0000000..462add2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public class ConstDefine {
+
+    public static String CASSANDRA_CONNECTOR_ADMIN_PREFIX = "CASSANDRA-CONNECTOR-ADMIN";
+    public static final String PREFIX = "cassandra";
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
new file mode 100644
index 0000000..bd58eea
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
@@ -0,0 +1,91 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.connector.CassandraSinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+public class DBUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+    public static CqlSession initCqlSession(Config config) throws Exception {
+        log.info("Trying to init Cql Session ");
+        Map<String, String> map = new HashMap<>();
+
+        String dbUrl = config.getDbUrl();
+        String dbPort = config.getDbPort();
+        String localDataCenter = config.getLocalDataCenter();
+        String username =  config.getDbUsername();
+        String password =  config.getDbPassword();
+
+//        sessionBuilder.addContactPoint(new InetSocketAddress(dbUrl, Integer.parseInt(dbPort)))
+//                      .withAuthCredentials(username, password);
+
+
+        log.info("Cassandra dbUrl: {}", dbUrl);
+        log.info("Cassandra dbPort: {}", dbPort);
+        log.info("Cassandra datacenter: {}", localDataCenter);
+        log.info("Cassandra username: {}", username);
+        log.info("Cassandra password: {}", password);
+
+        CqlSession cqlSession = null;
+        log.info("Using Program Config Loader");
+        try {
+            ExecutorService executorService = Executors.newSingleThreadExecutor();
+            Future<CqlSession> handle = executorService.submit(new Callable<CqlSession>() {
+                @Override
+                public CqlSession call() {
+                    return CqlSession.builder()
+                            .addContactPoint(new InetSocketAddress(dbUrl, Integer.valueOf(dbPort)))
+                            .withLocalDatacenter(localDataCenter)
+                            .build();
+                }
+            });
+
+            cqlSession = handle.get();
+
+        } catch (Exception e) {
+            log.info("error when creating cqlSession {}", e.getMessage());
+            e.printStackTrace();
+        }
+        log.info("init Cql Session success");
+
+        return cqlSession;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
new file mode 100644
index 0000000..d6f814f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public enum DataType {
+
+    COMMON_MESSAGE,
+    TOPIC_CONFIG,
+    BROKER_CONFIG,
+    SUB_CONFIG,
+    OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
new file mode 100644
index 0000000..0911e20
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
@@ -0,0 +1,76 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createGroupName(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<>();
+        for (String ns : namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<>();
+
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) {
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
new file mode 100644
index 0000000..b9b115e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class Config {
+    private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
+    /* Database Connection Config */
+    private String dbUrl;
+    private String dbPort;
+    private String localDataCenter;
+    private String dbUsername;
+    private String dbPassword;
+    private String dataType;
+    private String rocketmqTopic;
+
+    private List tableWhitelist;
+    private List tableBlacklist;
+    private String whiteDataBase;
+    private String whiteTable;
+
+
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+    public static final String CONN_WHITE_LIST = "whiteDataBase";
+    public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static final String CONN_DB_IP = "dbUrl";
+    public static final String CONN_DB_PORT = "dbPort";
+    public static final String CONN_DB_USERNAME = "dbUsername";
+    public static final String CONN_DB_PASSWORD = "dbPassword";
+    public static final String CONN_DB_DATACENTER = "localDataCenter";
+    public static final String CONN_DATA_TYPE = "dataType";
+    public static final String CONN_TOPIC_NAMES = "topicNames";
+    public static final String CONN_DB_MODE = "mode";
+
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
+    /* Mode Config */
+    private String mode = "";
+    private String incrementingColumnName = "";
+    private String query = "";
+    private String timestampColmnName = "";
+    private boolean validateNonNull = true;
+
+    /*Connector config*/
+    private String tableTypes = "table";
+    private long pollInterval = 5000;
+    private int batchMaxRows = 100;
+    private long tablePollInterval = 60000;
+    private long timestampDelayInterval = 0;
+    private String dbTimezone = "GMT+8";
+    private String queueName;
+
+    private Logger log = LoggerFactory.getLogger(Config.class);
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("dbUrl");
+            add("dbPort");
+            add("localDataCenter");
+            add("dbUsername");
+            add("dbPassword");
+            add("mode");
+            add("rocketmqTopic");
+        }
+    };
+
+
+
+    public static Logger getLOG() {
+        return LOG;
+    }
+
+    public String getDbUrl() { return dbUrl; }
+
+    public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; }
+
+    public String getDbPort() { return dbPort; }
+
+    public void setDbPort(String dbPort) { this.dbPort = dbPort; }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getDbUsername() {
+        return dbUsername;
+    }
+
+    public void setDbUsername(String dbUsername) {
+        this.dbUsername = dbUsername;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(String dataType) {
+        this.dataType = dataType;
+    }
+
+    public String getRocketmqTopic() {
+        return rocketmqTopic;
+    }
+
+    public void setRocketmqTopic(String rocketmqTopic) {
+        this.rocketmqTopic = rocketmqTopic;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getIncrementingColumnName() {
+        return incrementingColumnName;
+    }
+
+    public void setIncrementingColumnName(String incrementingColumnName) {
+        this.incrementingColumnName = incrementingColumnName;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    public String getTimestampColmnName() {
+        return timestampColmnName;
+    }
+
+    public void setTimestampColmnName(String timestampColmnName) {
+        this.timestampColmnName = timestampColmnName;
+    }
+
+    public boolean isValidateNonNull() {
+        return validateNonNull;
+    }
+
+    public void setValidateNonNull(boolean validateNonNull) {
+        this.validateNonNull = validateNonNull;
+    }
+
+    public String getTableTypes() {
+        return tableTypes;
+    }
+
+    public void setTableTypes(String tableTypes) {
+        this.tableTypes = tableTypes;
+    }
+
+    public long getPollInterval() {
+        return pollInterval;
+    }
+
+    public void setPollInterval(long pollInterval) {
+        this.pollInterval = pollInterval;
+    }
+
+    public int getBatchMaxRows() {
+        return batchMaxRows;
+    }
+
+    public void setBatchMaxRows(int batchMaxRows) {
+        this.batchMaxRows = batchMaxRows;
+    }
+
+    public long getTablePollInterval() {
+        return tablePollInterval;
+    }
+
+    public void setTablePollInterval(long tablePollInterval) {
+        this.tablePollInterval = tablePollInterval;
+    }
+
+    public long getTimestampDelayInterval() {
+        return timestampDelayInterval;
+    }
+
+    public void setTimestampDelayInterval(long timestampDelayInterval) {
+        this.timestampDelayInterval = timestampDelayInterval;
+    }
+
+    public String getDbTimezone() {
+        return dbTimezone;
+    }
+
+    public void setDbTimezone(String dbTimezone) {
+        this.dbTimezone = dbTimezone;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
+
+    public Logger getLog() {
+        return log;
+    }
+
+    public void setLog(Logger log) {
+        this.log = log;
+    }
+
+    public List getTableWhitelist() {
+        return tableWhitelist;
+    }
+
+    public void setTableWhitelist(List tableWhitelist) {
+        this.tableWhitelist = tableWhitelist;
+    }
+
+    public List getTableBlacklist() {
+        return tableBlacklist;
+    }
+
+    public void setTableBlacklist(List tableBlacklist) {
+        this.tableBlacklist = tableBlacklist;
+    }
+
+    public String getWhiteDataBase() {
+        return whiteDataBase;
+    }
+
+    public void setWhiteDataBase(String whiteDataBase) {
+        this.whiteDataBase = whiteDataBase;
+    }
+
+    public String getWhiteTable() {
+        return whiteTable;
+    }
+
+    public void setWhiteTable(String whiteTable) {
+        this.whiteTable = whiteTable;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
new file mode 100644
index 0000000..1c08fb2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
new file mode 100644
index 0000000..3dd25c0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
@@ -0,0 +1,110 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+    public TaskDivideStrategy taskDivideStrategy;
+    public String dbUrl;
+    public String dbPort;
+    public String dbUserName;
+    public String dbPassword;
+    public String localDataCenter;
+    public String converter;
+    public int taskParallelism;
+    public String mode;
+
+    public abstract void validate(KeyValue config);
+
+    public abstract <T> T getWhiteTopics();
+
+    public TaskDivideStrategy getTaskDivideStrategy() {
+        return taskDivideStrategy;
+    }
+
+    public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+        this.taskDivideStrategy = taskDivideStrategy;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getConverter() {
+        return converter;
+    }
+
+    public void setConverter(String converter) {
+        this.converter = converter;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
new file mode 100644
index 0000000..3145033
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
@@ -0,0 +1,112 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkDbConnectorConfig extends DbConnectorConfig {
+    private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String srcCluster;
+    private long refreshInterval;
+    private Map<String, Set<TaskTopicInfo>> topicRouteMap;
+
+    public SinkDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+        this.taskDivideStrategy = new DivideTaskByTopic();
+
+        buildWhiteList(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+        this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+        this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+        this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+        this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<>();
+        String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
+        String[] wl = whiteListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+
+    public Set<String> getWhiteList() {
+        return whiteList;
+    }
+
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
+        return topicRouteMap;
+    }
+
+    public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) {
+        this.topicRouteMap = topicRouteMap;
+    }
+
+    @Override
+    public Set<String> getWhiteTopics() {
+        return getWhiteList();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
new file mode 100644
index 0000000..6a3f685
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+    private Map<String, String> whiteMap;
+
+    public SourceDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+        this.taskDivideStrategy = new DivideTaskByTopic();
+
+        buildWhiteMap(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+        this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+    }
+
+    private void buildWhiteMap(KeyValue config) {
+        this.whiteMap = new HashMap<>(16);
+        String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+        JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+        if(whiteDataBaseObject.keySet().size() <= 0){
+            throw new IllegalArgumentException("white data base must be not empty.");
+        }else {
+            this.whiteMap.clear();
+            for (String dbName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+                for (String tableName : whiteTableObject.keySet()){
+                    String dbTableKey = dbName + "-" + tableName;
+                    this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+                }
+            }
+        }
+    }
+
+
+    public Map<String, String> getWhiteMap() {
+        return whiteMap;
+    }
+
+    public void setWhiteMap(Map<String, String> whiteMap) {
+        this.whiteMap = whiteMap;
+    }
+
+    @Override
+    public Map<String, String> getWhiteTopics() {
+        return getWhiteMap();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
new file mode 100644
index 0000000..7c43137
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.config;
+
+public class TaskDivideConfig {
+
+    private String dbUrl;
+
+    private String dbPort;
+
+    private String dbUserName;
+
+    private String dbPassword;
+
+    private String localDataCenter;
+
+    private String srcRecordConverter;
+
+    private int dataType;
+
+    private int taskParallelism;
+
+    private String mode;
+
+    public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String localDataCenter,
+                            String srcRecordConverter, int dataType, int taskParallelism, String mode) {
+        this.dbUrl = dbUrl;
+        this.dbPort = dbPort;
+        this.dbUserName = dbUserName;
+        this.dbPassword = dbPassword;
+        this.localDataCenter = localDataCenter;
+        this.srcRecordConverter = srcRecordConverter;
+        this.dataType = dataType;
+        this.taskParallelism = taskParallelism;
+        this.mode = mode;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
+    }
+
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
+    }
+
+    public int getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(int dataType) {
+        this.dataType = dataType;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
new file mode 100644
index 0000000..074faab
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
@@ -0,0 +1,40 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+    private String targetTopic;
+
+    public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+        super(sourceTopic, brokerName, queueId);
+        this.targetTopic = targetTopic;
+    }
+
+    public String getTargetTopic() {
+        return this.targetTopic;
+    }
+
+    public void setTargetTopic(String targetTopic) {
+        this.targetTopic = targetTopic;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
new file mode 100644
index 0000000..6ce23f6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
@@ -0,0 +1,240 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.cassandra.common.CloneUtils;
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.common.Utils;
+import org.apache.rocketmq.connect.cassandra.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class CassandraSinkConnector extends SinkConnector{
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkConnector.class);
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+    private ScheduledExecutorService executor;
+    private HashMap<String, Set<TaskTopicInfo>> topicRouteMap;
+
+    private DefaultMQAdminExt srcMQAdminExt;
+
+    private volatile boolean adminStarted;
+
+    private ScheduledFuture<?> listenerHandle;
+
+    public CassandraSinkConnector() {
+        topicRouteMap = new HashMap<>();
+        dbConnectorConfig = new SinkDbConnectorConfig();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("CassandraSinkConnector-SinkWatcher-%d").daemon(true).build());
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.CASSANDRA_CONNECTOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+        try {
+            log.info("Trying to start srcMQAdminExt");
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+        } catch (MQClientException e) {
+            log.error("Cassandra Sink Task start failed for `srcMQAdminExt` exception.", e);
+        }
+
+        adminStarted = true;
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+        startMQAdminTools();
+        startListener();
+    }
+
+    public void startListener() {
+        listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+            boolean first = true;
+            HashMap<String, Set<TaskTopicInfo>> origin = null;
+
+            @Override
+            public void run() {
+                buildRoute();
+                if (first) {
+                    origin = CloneUtils.clone(topicRouteMap);
+                    first = false;
+                }
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                    origin = CloneUtils.clone(topicRouteMap);
+                }
+            }
+        }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            Set<TaskTopicInfo> originTasks = entry.getValue();
+            Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void buildRoute() {
+        String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+        try {
+            for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+                // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                Set<String> brokerNameSet = new HashSet<String>();
+                for (BrokerData b : brokerList) {
+                    brokerNameSet.add(b.getBrokerName());
+                }
+
+                TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+                if (!topicRouteMap.containsKey(topic)) {
+                    topicRouteMap.put(topic, new HashSet<>(16));
+                }
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    if (brokerNameSet.contains(qd.getBrokerName())) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+                            topicRouteMap.get(topic).add(taskTopicInfo);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            // srcMQAdminExt.shutdown();
+        }
+    }
+
+
+    /**
+     * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why
+     * it can be applied to srcMQAdminExt
+     */
+    @Override
+    public void stop() {
+        listenerHandle.cancel(true);
+        // srcMQAdminExt.shutdown();
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return CassandraSinkTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        startMQAdminTools();
+
+        buildRoute();
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            this.dbConnectorConfig.getDbUrl(),
+            this.dbConnectorConfig.getDbPort(),
+            this.dbConnectorConfig.getDbUserName(),
+            this.dbConnectorConfig.getDbPassword(),
+            this.dbConnectorConfig.getLocalDataCenter(),
+            this.dbConnectorConfig.getConverter(),
+            DataType.COMMON_MESSAGE.ordinal(),
+            this.dbConnectorConfig.getTaskParallelism(),
+            this.dbConnectorConfig.getMode()
+        );
+
+        ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
new file mode 100644
index 0000000..a8e9b0a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
@@ -0,0 +1,161 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for "columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with SQL databases
+ */
+public class CassandraSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+    private Config config;
+
+    private CqlSession cqlSession;
+    private Updater updater;
+    private BlockingQueue<Updater> tableQueue = new LinkedBlockingQueue<Updater>();
+
+    public CassandraSinkTask() {
+        this.config = new Config();
+    }
+
+    @Override
+    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+        try {
+            if (tableQueue.size() > 1) {
+                updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            } else {
+                updater = tableQueue.peek();
+            }
+            log.info("Cassandra Sink Task trying to put()");
+            for (SinkDataEntry record : sinkDataEntries) {
+                Map<Field, Object[]> fieldMap = new HashMap<>();
+                Object[] payloads = record.getPayload();
+                Schema schema = record.getSchema();
+                EntryType entryType = record.getEntryType();
+                String cfName = schema.getName();
+                String keyspaceName = schema.getDataSource();
+                List<Field> fields = schema.getFields();
+                Boolean parseError = false;
+                if (!fields.isEmpty()) {
+                    for (Field field : fields) {
+                        Object fieldValue = payloads[field.getIndex()];
+                        Object[] value = JSONObject.parseArray((String)fieldValue).toArray();
+                        if (value.length == 2) {
+                            fieldMap.put(field, value);
+                        } else {
+                            log.error("parseArray error, fieldValue:{}", fieldValue);
+                            parseError = true;
+                        }
+                    }
+                }
+                if (!parseError) {
+                    log.info("Cassandra Sink Task trying to call updater.push()");
+                    Boolean isSuccess = updater.push(keyspaceName, cfName, fieldMap, entryType);
+                    if (!isSuccess) {
+                        log.error("push data error, keyspaceName:{}, cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap, entryType);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("put sinkDataEntries error, {}", e);
+        }
+    }
+
+    @Override
+    public void commit(Map<QueueMetaData, Long> map) {
+
+    }
+
+    /**
+     * Remember always close the CqlSession according to
+     * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+     * @param props
+     */
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.config);
+            cqlSession = DBUtils.initCqlSession(config);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Cassandra Sink Task because of configuration error{}", e);
+        }
+        String mode = config.getMode();
+        if (mode.equals("bulk")) {
+            Updater updater = new Updater(config, cqlSession);
+            try {
+                updater.start();
+                tableQueue.add(updater);
+            } catch (Exception e) {
+                log.error("fail to start updater{}", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (cqlSession != null){
+                cqlSession.close();
+                log.info("cassandra sink task connection is closed.");
+            }
+        } catch (Throwable e) {
+            log.warn("sink task stop error while closing connection to {}", "cassandra", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
new file mode 100644
index 0000000..a8adc74
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.SourceDbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceConnector extends SourceConnector {
+    private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector.class);
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+
+    public CassandraSourceConnector() {
+        dbConnectorConfig = new SourceDbConnectorConfig();
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        log.info("CassandraSourceConnector verifyAndSetConfig enter");
+        for (String requestKey : Config.REQUEST_CONFIG) {
+
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return CassandraSourceTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+                this.dbConnectorConfig.getDbUrl(),
+                this.dbConnectorConfig.getDbPort(),
+                this.dbConnectorConfig.getDbUserName(),
+                this.dbConnectorConfig.getDbPassword(),
+                this.dbConnectorConfig.getLocalDataCenter(),
+                this.dbConnectorConfig.getConverter(),
+                DataType.COMMON_MESSAGE.ordinal(),
+                this.dbConnectorConfig.getTaskParallelism(),
+                this.dbConnectorConfig.getMode()
+        );
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
new file mode 100644
index 0000000..cac44ed
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
@@ -0,0 +1,168 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.apache.rocketmq.connect.cassandra.source.Querier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+
+    private Config config;
+
+    private DataSource dataSource;
+
+    private CqlSession cqlSession;
+
+    BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
+    static final String INCREMENTING_FIELD = "incrementing";
+    static final String TIMESTAMP_FIELD = "timestamp";
+    private Querier querier;
+
+    public CassandraSourceTask() {
+        this.config = new Config();
+    }
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        try {
+            if (tableQueue.size() > 1)
+                querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            else
+                querier = tableQueue.peek();
+            Timer timer = new Timer();
+            try {
+                Thread.currentThread();
+                Thread.sleep(1000);//毫秒
+            } catch (Exception e) {
+                throw e;
+            }
+            querier.poll();
+            for (Table dataRow : querier.getList()) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("nextQuery", "database");
+                jsonObject.put("nextPosition", "table");
+                Schema schema = new Schema();
+                schema.setDataSource(dataRow.getDatabase());
+                schema.setName(dataRow.getName());
+                schema.setFields(new ArrayList<>());
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
+                    String columnName = dataRow.getColList().get(i);
+                    String rawDataType = dataRow.getRawDataTypeList().get(i);
+                    Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+                    schema.getFields().add(field);
+                }
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+                        .entryType(EntryType.UPDATE);
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
+                    Object[] value = new Object[2];
+                    value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
+                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
+                }
+
+                SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                        ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+                        ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
+                res.add(sourceDataEntry);
+                log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
+            }
+        } catch (Exception e) {
+            log.error("Cassandra task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res));
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.config);
+            cqlSession = DBUtils.initCqlSession(config);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Cassandra Source Task because of configuration error{}", e);
+        }
+        Map<Map<String, String>, Map<String, Object>> offsets = null;
+        String mode = config.getMode();
+        if (mode.equals("bulk")) {
+            Querier querier = new Querier(config, cqlSession);
+            try {
+                querier.start();
+                tableQueue.add(querier);
+            } catch (Exception e) {
+                log.error("start querier failed in bulk mode{}", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (cqlSession != null) {
+                cqlSession.close();
+                log.info("Cassandra source task connection is closed.");
+            }
+        } catch (Throwable e) {
+            log.warn("source task stop error while closing connection to {}", "Cassandra", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
new file mode 100644
index 0000000..c8f69e6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class Database {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Database.class);
+
+    private static final String CQL = "SELECT * FROM system_schema.columns \n" +
+            "WHERE keyspace_name = 'keyspace_name' AND table_name = 'table_name'";
+
+    private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+    private static final String TABLE_COLUMNS = "columns";
+
+    private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+    private static final String COLUMN_TABLE_NAME = "table_name";
+
+    private static final String COLUMN_COLUMN_NAME = "column_name";
+
+    private static final String COLUMN_TYPE = "type";
+
+    private static final String GENERAL_CHARSET = "utf-8";
+
+    private String name;
+
+    private CqlSession cqlSession;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Set<String> tableWhiteList;
+
+    public Map<String, Map<String, String>> tableFilterMap;
+
+    public Database(String name, CqlSession cqlSession, Set<String> tableWhiteList, Map<String, Map<String, String>> tableFilterMap) {
+        this.name = name;
+        this.cqlSession = cqlSession;
+        this.tableWhiteList = tableWhiteList;
+        this.tableFilterMap = tableFilterMap;
+
+    }
+
+    public void init(){
+        Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_COLUMNS)
+                .all()
+                .whereColumn(COLUMN_KEYSPACE_NAME)
+                .isEqualTo(QueryBuilder.literal(name));
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+        ResultSet result = null;
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = selectFrom.build();
+                result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    LOGGER.info("query columns success, executed cql query {}", selectFrom.asCql());
+                }
+                finishUpdate = true;
+            }
+
+            for (Row row : result) {
+                String tableName = row.getString(COLUMN_TABLE_NAME);
+                String columnName = row.getString(COLUMN_COLUMN_NAME);
+                String columnType = row.getString(COLUMN_TYPE);
+
+
+
+                ColumnParser columnParser = ColumnParser.getColumnParser(columnType, columnType, GENERAL_CHARSET);
+
+                if (!tableWhiteList.contains(tableName)){
+                    continue;
+                }
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(columnName);
+                table.addParser(columnParser);
+                table.addRawDataType(columnType);
+                table.setFilterMap(tableFilterMap.get(tableName));
+            }
+        } catch (Exception e) {
+            LOGGER.error("init cassandra Schema failure,{}", e);
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+
+    public Map<String, Table> getTableMap() {
+        return tableMap;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
new file mode 100644
index 0000000..01054e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+
+public class Schema {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
+
+    private static final String CQL = "SELECT * FROM system_schema.keyspaces";
+
+    private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+    private static final String TABLE_KEYSPACES = "keyspaces";
+
+    private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+    private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+        Arrays.asList(new String[] {"system_schema", "system", "system_auth", "system_distributed", "system_traces"})
+    );
+
+    private CqlSession cqlSession;
+
+    private Map<String, Database> dbMap;
+
+    public Map<String, Set<String>> dbTableMap;
+
+    public Map<String, Map<String, String>> tableFilterMap;
+
+    public Schema(CqlSession cqlSession) {
+        this.cqlSession = cqlSession;
+        this.dbTableMap = new HashMap<>();
+        this.tableFilterMap = new HashMap<>();
+    }
+
+    public void load() throws Exception {
+
+        dbMap = new HashMap<>();
+
+        Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_KEYSPACES)
+                .column(COLUMN_KEYSPACE_NAME);
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+        ResultSet result = null;
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = selectFrom.build();
+                result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    LOGGER.info("update table success, executed cql query {}", selectFrom.asCql());
+                }
+                finishUpdate = true;
+            }
+
+            for (Row row : result) {
+                String dbName = row.getString(COLUMN_KEYSPACE_NAME);
+                if (!IGNORED_DATABASES.contains(dbName) && dbTableMap.keySet().contains(dbName)) {
+                    Database database = new Database(dbName, cqlSession, dbTableMap.get(dbName), tableFilterMap);
+                    dbMap.put(dbName, database);
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("init cassandra Schema failure,{}", e);
+        }
+
+        for (Database db : dbMap.values()) {
+            db.init();
+        }
+
+    }
+
+//    public Table getTable(String dbName, String tableName) {
+//
+//        if (dbMap == null) {
+//            reload();
+//        }
+//
+//        Database database = dbMap.get(dbName);
+//        if (database == null) {
+//            return null;
+//        }
+//
+//        Table table = database.getTable(tableName);
+//        if (table == null) {
+//            return null;
+//        }
+//
+//        return table;
+//    }
+
+    private void reload() {
+
+        while (true) {
+            try {
+                load();
+                break;
+            } catch (Exception e) {
+                LOGGER.error("Cassandra Source Connector reload schema error.", e);
+            }
+        }
+    }
+
+    public void reset() {
+        dbMap = null;
+    }
+
+    public Map<String, Database> getDbMap() {
+        return dbMap;
+    }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
new file mode 100644
index 0000000..902b797
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class Table {
+
+    private String database;
+    private String name;
+    private List<String> colList = new LinkedList<>();
+    private List<ColumnParser> parserList = new LinkedList<>();
+    private List<String> rawDataTypeList = new LinkedList<>();
+    private List<Object> dataList = new LinkedList<>();
+    private Map<String, String> filterMap = new HashMap<>();
+
+    public Table(String database, String table) {
+        this.database = database;
+        this.name = table;
+    }
+
+    public void addCol(String column) {
+        colList.add(column);
+    }
+
+    public void setParserList(List<ColumnParser> parserList) {
+        this.parserList = parserList;
+    }
+
+    public void setRawDataTypeList(List<String> rawDataTypeList) {
+        this.rawDataTypeList = rawDataTypeList;
+    }
+
+    public void addParser(ColumnParser columnParser) {
+        parserList.add(columnParser);
+    }
+
+    public void addRawDataType(String rawDataType) {
+        this.rawDataTypeList.add(rawDataType);
+    }
+
+    public List<String> getColList() {
+        return colList;
+    }
+
+    public List<String> getRawDataTypeList() {
+        return rawDataTypeList;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<ColumnParser> getParserList() {
+        return parserList;
+    }
+
+    public List<Object> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Object> dataList) {
+        this.dataList = dataList;
+    }
+
+    public void setColList(List<String> colList) {
+        this.colList = colList;
+    }
+
+    public Map<String, String> getFilterMap() {
+        return filterMap;
+    }
+
+    public void setFilterMap(Map<String, String> filterMap) {
+        this.filterMap = filterMap;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
new file mode 100644
index 0000000..0506469
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.math.BigInteger;
+
+public class BigIntColumnParser extends ColumnParser {
+
+    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+    private boolean signed;
+
+    public BigIntColumnParser(String colType) {
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return value;
+        }
+
+        Long l = (Long) value;
+        if (!signed && l < 0) {
+            return max.add(BigInteger.valueOf(l));
+        } else {
+            return l;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
new file mode 100644
index 0000000..8d5b2bb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class BooleanColumnParser extends ColumnParser{
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Boolean) {
+            return value;
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
new file mode 100644
index 0000000..1eab587
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import io.openmessaging.connector.api.data.FieldType;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+
+
+    /**
+     * Currently this class implementation is not complete yet.
+     * @param dataType
+     * @param colType
+     * @param charset
+     * @return
+     */
+    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "varint":
+            case "int":
+                return new IntColumnParser(dataType, colType);
+            case "bigint":
+                return new BigIntColumnParser(colType);
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "ascii":
+            case "char":
+                return new StringColumnParser(charset);
+            case "date":
+            case "datetime":
+            case "timestamp":
+                return new DateTimeColumnParser();
+            case "time":
+                return new TimeColumnParser();
+            case "year":
+                return new YearColumnParser();
+            case "enum":
+                return new EnumColumnParser(colType);
+            case "set":
+                return new SetColumnParser(colType);
+            case "boolean":
+                return new BooleanColumnParser();
+            default:
+                return new DefaultColumnParser();
+        }
+    }
+
+    public static FieldType mapConnectorFieldType(String dataType) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return FieldType.INT32;
+            case "bigint":
+                return FieldType.BIG_INTEGER;
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return FieldType.STRING;
+            case "date":
+            case "datetime":
+            case "timestamp":
+            case "time":
+            case "year":
+                return FieldType.DATETIME;
+            case "enum":
+                return null;
+            case "set":
+                return null;
+            default:
+                return FieldType.BYTES;
+        }
+    }
+
+    public static String[] extractEnumValues(String colType) {
+        String[] enumValues = {};
+        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+        if (matcher.matches()) {
+            enumValues = matcher.group(2).replace("'", "").split(",");
+        }
+
+        return enumValues;
+    }
+
+    public abstract Object getValue(Object value);
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..b110a19
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+    private static SimpleDateFormat dateTimeFormat;
+    private static SimpleDateFormat dateTimeUtcFormat;
+
+    static {
+        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("GMT+8"));
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return dateTimeFormat.format(value);
+        }
+
+        if (value instanceof Long) {
+            return dateTimeUtcFormat.format(new Date((Long) value));
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
new file mode 100644
index 0000000..01d8d1a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class DefaultColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof byte[]) {
+            return Base64.encodeBase64String((byte[]) value);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
new file mode 100644
index 0000000..245dfd6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class EnumColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public EnumColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        Integer i = (Integer) value;
+        if (i == 0) {
+            return null;
+        } else {
+            return enumValues[i - 1];
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..2257682
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+    private int bits;
+    private boolean signed;
+
+    public IntColumnParser(String dataType, String colType) {
+
+        switch (dataType) {
+            case "tinyint":
+                bits = 8;
+                break;
+            case "smallint":
+                bits = 16;
+                break;
+            case "mediumint":
+                bits = 24;
+                break;
+            case "int":
+                bits = 32;
+        }
+
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Long) {
+            return value;
+        }
+
+        if (value instanceof Integer) {
+            Integer i = (Integer) value;
+            if (signed || i > 0) {
+                return i;
+            } else {
+                return (1L << bits) + i;
+            }
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..eaa6dad
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public SetColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        StringBuilder builder = new StringBuilder();
+        long l = (Long) value;
+
+        boolean needSplit = false;
+        for (int i = 0; i < enumValues.length; i++) {
+            if (((l >> i) & 1) == 1) {
+                if (needSplit)
+                    builder.append(",");
+
+                builder.append(enumValues[i]);
+                needSplit = true;
+            }
+        }
+
+        return builder.toString();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..2bd7a36
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+    private String charset;
+
+    public StringColumnParser(String charset) {
+        this.charset = charset.toLowerCase();
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        byte[] bytes = (byte[]) value;
+
+        switch (charset) {
+            case "utf8":
+            case "utf8mb4":
+                return new String(bytes, Charsets.UTF_8);
+            case "latin1":
+            case "ascii":
+                return new String(bytes, Charsets.ISO_8859_1);
+            case "ucs2":
+                return new String(bytes, Charsets.UTF_16);
+            default:
+                return new String(bytes, Charsets.toCharset(charset));
+
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
new file mode 100644
index 0000000..c812a53
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class TimeColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+
+            return new Time(((Timestamp) value).getTime());
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
new file mode 100644
index 0000000..82d61a8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Date;
+import java.util.Calendar;
+
+public class YearColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime((Date) value);
+            return calendar.get(Calendar.YEAR);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
new file mode 100644
index 0000000..1c3f2c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.sink;
+
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.delete.Delete;
+import com.datastax.oss.driver.api.querybuilder.delete.DeleteSelection;
+import com.datastax.oss.driver.api.querybuilder.insert.InsertInto;
+import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Updater {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Queue<CqlSession> connections = new ConcurrentLinkedQueue<>();
+    private Config config;
+    private CqlSession cqlSession;
+
+    private static final int BEFORE_UPDATE = 0;
+    private static final int AFTER_UPDATE = 1;
+
+    public Updater(Config config, CqlSession cqlSession) {
+        this.config = config;
+        this.cqlSession = cqlSession;
+    }
+
+    /**
+     * We cannot know the primary key of each table, so we have to put every field in the where clause
+     * @param dbName
+     * @param tableName
+     * @param fieldMap
+     * @param entryType
+     * @return
+     */
+    public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) {
+        log.info("Updater Trying to push data");
+        Boolean isSuccess = false;
+        boolean afterUpdateExist;
+        boolean beforeUpdateExist;
+        switch (entryType) {
+            case CREATE:
+                isSuccess = updateRow(dbName, tableName, fieldMap);
+                break;
+            case UPDATE:
+                isSuccess = updateRow(dbName, tableName, fieldMap);
+                break;
+            case DELETE:
+                isSuccess = deleteRow(dbName, tableName, fieldMap);
+                break;
+            default:
+                log.error("entryType {} is illegal.", entryType.toString());
+        }
+        return isSuccess;
+    }
+
+    public void start() throws Exception {
+        log.info("schema load success");
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+
+    /** Since we have no way of getting the id of a record, and we cannot get the primary key list of a table,
+     * even we can it is not extensible. So we the result sql sentense would be like
+     * UPDATE dbName.tableName SET afterUpdateValues WHERE beforeUpdateValues.
+     *
+     */
+    private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+        log.info("Updater.updateRow() get called ");
+        int count = 0;
+        InsertInto insert = QueryBuilder.insertInto(dbName, tableName);
+        RegularInsert regularInsert = null;
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if (count == 1) {
+                regularInsert = insert.value(fieldName, buildTerm(fieldType, fieldValue));
+            }
+            else {
+                regularInsert = regularInsert.value(fieldName, buildTerm(fieldType, fieldValue));
+            }
+        }
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        log.info("trying to execute sql query,{}", regularInsert.asCql());
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = regularInsert.build();
+                ResultSet result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    log.info("update table success, executed cql query {}", regularInsert.asCql());
+                    return true;
+                }
+                finishUpdate = true;
+            }
+        } catch (Exception e) {
+            log.error("update table error,{}", e);
+        }
+        return false;
+    }
+
+
+    private boolean deleteRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+        DeleteSelection deleteSelection = QueryBuilder.deleteFrom(dbName, tableName);
+        Delete delete = null;
+        int count = 0;
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if (count == 1) {
+                delete = deleteSelection.whereColumn(fieldName)
+                    .isEqualTo(buildTerm(fieldType, fieldValue));
+            }
+            else {
+                delete = delete.whereColumn(fieldName)
+                    .isEqualTo(buildTerm(fieldType, fieldValue));
+            }
+        }
+
+        boolean finishDelete = false;
+        SimpleStatement stmt = delete.build();
+        try {
+            while (!cqlSession.isClosed() && !finishDelete){
+                ResultSet result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    log.info("delete from table success, executed query {}", delete);
+                    return true;
+                }
+                finishDelete = true;
+            }
+        } catch (Exception e) {
+            log.error("delete from table error,{}", e);
+        }
+        return false;
+    }
+
+
+    /**
+     * Cassandra datastax driver automatically
+     * infer type from literal value, or we can use "typeHints" to
+     * tell datastax driver what type should this  literal be. We will add
+     * type hints utils once we found it is necessary to do so. For now literal
+     * inference should be enough.
+     *
+     * @param fieldType
+     * @param fieldValue
+     * @return
+     */
+    private Term buildTerm(FieldType fieldType, Object fieldValue) {
+        return QueryBuilder.literal(fieldValue);
+    }
+    private DataType typeParser(FieldType fieldType) {
+        DataType dataType = null;
+        switch (fieldType) {
+            case STRING:
+                break;
+            case DATETIME:
+                break;
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BIG_INTEGER:
+                dataType = DataTypes.BIGINT;
+                break;
+            default:
+                log.error("fieldType {} is illegal.", fieldType.toString());
+        }
+        return dataType;
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
new file mode 100644
index 0000000..8a8ddf4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.source;
+
+import com.alibaba.fastjson.JSONObject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.schema.Database;
+import org.apache.rocketmq.connect.cassandra.schema.Schema;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Querier {
+
+    private final Logger log = LoggerFactory.getLogger(Querier.class); // use concrete subclass
+    protected String topicPrefix;
+    private final Queue<CqlSession> cqlSessions = new ConcurrentLinkedQueue<>();
+    private Config config;
+    private CqlSession cqlSession;
+    private List<Table> list = new LinkedList<>();
+    private String mode;
+    private Schema schema;
+
+    public Querier(){
+
+    }
+
+    public Querier(Config config, CqlSession cqlSession) {
+        this.config = config;
+        this.cqlSession = cqlSession;
+        this.schema = new Schema(cqlSession);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    public List<Table> getList() {
+        return list;
+    }
+
+
+    public void poll()  {
+        try {
+            LinkedList<Table> tableLinkedList = new LinkedList<>();
+
+            for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
+                String dbName = entry.getKey();
+                Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, Table> tableEntry = iterator.next();
+                    String tableName = tableEntry.getKey();
+                    Table table = tableEntry.getValue();
+                    Map<String, String> tableFilterMap = table.getFilterMap();
+
+
+                    Select selectFrom = QueryBuilder.selectFrom(dbName, tableName).all();
+                    if (tableFilterMap != null && !tableFilterMap.keySet().contains("NO-FILTER")){
+                        for (String key: tableFilterMap.keySet()) {
+                            String value = tableFilterMap.get(key);
+                            selectFrom.whereColumn(key).isEqualTo(QueryBuilder.literal(value));
+                        }
+                    }
+
+
+                    SimpleStatement stmt;
+                    boolean finishUpdate = false;
+                    log.info("trying to execute sql query,{}", selectFrom.asCql());
+                    ResultSet result = null;
+                    while (!cqlSession.isClosed() && !finishUpdate){
+                        stmt = selectFrom.build();
+                        result = cqlSession.execute(stmt);
+                        if (result.wasApplied()) {
+                            log.info("query columns success, executed cql query {}", selectFrom.asCql());
+                        }
+                        finishUpdate = true;
+                    }
+
+                    List<String> colList = tableEntry.getValue().getColList();
+                    List<String> dataTypeList = tableEntry.getValue().getRawDataTypeList();
+                    List<ColumnParser> parserList = tableEntry.getValue().getParserList();
+
+                    for (Row row : result) {
+                        Table tableWithData = new Table(dbName, tableName);
+                        tableWithData.setColList(colList);
+                        tableWithData.setRawDataTypeList(dataTypeList);
+                        tableWithData.setParserList(parserList);
+
+                        for (String col : colList) {
+                            tableWithData.getDataList().add(row.getObject(col));
+                        }
+                        tableLinkedList.add(tableWithData);
+                    }
+                }
+            }
+            list = tableLinkedList;
+        } catch (Exception e) {
+            log.error("fail to poll data, {}", e);
+        }
+
+    }
+
+    public void start() throws Exception {
+        String whiteDataBases = config.getWhiteDataBase();
+        JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteDataBases);
+
+        if (whiteDataBaseObject != null){
+            for (String whiteDataBaseName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = (JSONObject)whiteDataBaseObject.get(whiteDataBaseName);
+                HashSet<String> whiteTableSet = new HashSet<>();
+                for (String whiteTableName : whiteTableObject.keySet()){
+                    Collections.addAll(whiteTableSet, whiteTableName);
+                    HashMap<String, String> filterMap = new HashMap<>();
+                    JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString());
+                    for(String filterKey : tableFilterObject.keySet()){
+                        filterMap.put(filterKey, tableFilterObject.getString(filterKey));
+                    }
+                    schema.tableFilterMap.put(whiteTableName, filterMap);
+                }
+                schema.dbTableMap.put(whiteDataBaseName, whiteTableSet);
+            }
+        }
+        schema.load();
+        log.info("load schema success");
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..f0eac2b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+public enum DivideStrategyEnum {
+
+    BY_TOPIC,
+    BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..68395d3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.cassandra.config.*;
+
+
+import java.util.*;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        if (dbConnectorConfig instanceof SourceDbConnectorConfig){
+            return divideSourceTaskByTopic(dbConnectorConfig, tdc);
+        }else {
+            return divideSinkTaskByTopic(dbConnectorConfig, tdc);
+        }
+    }
+
+    private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
+        for (String topicName : topicRouteSet) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new StringBuilder(topicName));
+            } else {
+                taskTopicList.get(ind).append(",").append(topicName);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+            keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+
+    private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+        for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new HashMap<>());
+            }
+            String dbKey = entry.getKey().split("-")[0];
+            String tableKey = entry.getKey().split("-")[1];
+            String filter = entry.getValue();
+            Map<String, String> tableMap = new HashMap<>();
+            tableMap.put(tableKey, filter);
+            if(!taskTopicList.get(ind).containsKey(dbKey)){
+                taskTopicList.get(ind).put(dbKey, tableMap);
+            }else {
+                taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+            keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..70a6773
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
@@ -0,0 +1,32 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskTopicInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+    public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc);
+}

[rocketmq-connect] 04/05: Add 'connector/rocketmq-connect-cassandra/' from commit 'e2cc843ef4926a98797ca76880579941d5363fc6'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit fafa276619b0b6291f0567465590adee55619c8c
Merge: 0669f46 e2cc843
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:17:21 2022 +0800

    Add 'connector/rocketmq-connect-cassandra/' from commit 'e2cc843ef4926a98797ca76880579941d5363fc6'
    
    git-subtree-dir: connector/rocketmq-connect-cassandra
    git-subtree-mainline: 0669f46a5018a92c4ad08fc6704dfa6606eb78ab
    git-subtree-split: e2cc843ef4926a98797ca76880579941d5363fc6

 connector/rocketmq-connect-cassandra/README.md     |  97 +++++++
 connector/rocketmq-connect-cassandra/pom.xml       | 276 ++++++++++++++++++++
 .../rocketmq-connect-cassandra/scripts/gen_data.py | 155 +++++++++++
 .../scripts/requirements.txt                       |   2 +
 .../connect/cassandra/common/CloneUtils.java       |  44 ++++
 .../connect/cassandra/common/ConstDefine.java      |  23 ++
 .../rocketmq/connect/cassandra/common/DBUtils.java |  91 +++++++
 .../connect/cassandra/common/DataType.java         |  26 ++
 .../rocketmq/connect/cassandra/common/Utils.java   |  76 ++++++
 .../rocketmq/connect/cassandra/config/Config.java  | 282 +++++++++++++++++++++
 .../connect/cassandra/config/ConfigUtil.java       |  70 +++++
 .../cassandra/config/DbConnectorConfig.java        | 110 ++++++++
 .../cassandra/config/SinkDbConnectorConfig.java    | 112 ++++++++
 .../cassandra/config/SourceDbConnectorConfig.java  |  87 +++++++
 .../connect/cassandra/config/TaskDivideConfig.java | 123 +++++++++
 .../connect/cassandra/config/TaskTopicInfo.java    |  40 +++
 .../connector/CassandraSinkConnector.java          | 240 ++++++++++++++++++
 .../cassandra/connector/CassandraSinkTask.java     | 161 ++++++++++++
 .../connector/CassandraSourceConnector.java        | 108 ++++++++
 .../cassandra/connector/CassandraSourceTask.java   | 168 ++++++++++++
 .../connect/cassandra/schema/Database.java         | 140 ++++++++++
 .../rocketmq/connect/cassandra/schema/Schema.java  | 146 +++++++++++
 .../rocketmq/connect/cassandra/schema/Table.java   | 103 ++++++++
 .../schema/column/BigIntColumnParser.java          |  50 ++++
 .../schema/column/BooleanColumnParser.java         |  34 +++
 .../cassandra/schema/column/ColumnParser.java      | 118 +++++++++
 .../schema/column/DateTimeColumnParser.java        |  53 ++++
 .../schema/column/DefaultColumnParser.java         |  37 +++
 .../cassandra/schema/column/EnumColumnParser.java  |  46 ++++
 .../cassandra/schema/column/IntColumnParser.java   |  66 +++++
 .../cassandra/schema/column/SetColumnParser.java   |  54 ++++
 .../schema/column/StringColumnParser.java          |  57 +++++
 .../cassandra/schema/column/TimeColumnParser.java  |  39 +++
 .../cassandra/schema/column/YearColumnParser.java  |  40 +++
 .../rocketmq/connect/cassandra/sink/Updater.java   | 216 ++++++++++++++++
 .../rocketmq/connect/cassandra/source/Querier.java | 164 ++++++++++++
 .../cassandra/strategy/DivideStrategyEnum.java     |  23 ++
 .../cassandra/strategy/DivideTaskByTopic.java      | 110 ++++++++
 .../cassandra/strategy/TaskDivideStrategy.java     |  32 +++
 39 files changed, 3819 insertions(+)

diff --cc connector/rocketmq-connect-cassandra/README.md
index 0000000,6439269..6439269
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/README.md
+++ b/connector/rocketmq-connect-cassandra/README.md
diff --cc connector/rocketmq-connect-cassandra/pom.xml
index 0000000,286a6ef..286a6ef
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/pom.xml
+++ b/connector/rocketmq-connect-cassandra/pom.xml
diff --cc connector/rocketmq-connect-cassandra/scripts/gen_data.py
index 0000000,8fde504..8fde504
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/scripts/gen_data.py
+++ b/connector/rocketmq-connect-cassandra/scripts/gen_data.py
diff --cc connector/rocketmq-connect-cassandra/scripts/requirements.txt
index 0000000,fc7dc31..fc7dc31
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/scripts/requirements.txt
+++ b/connector/rocketmq-connect-cassandra/scripts/requirements.txt
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
index 0000000,c860750..c860750
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
index 0000000,462add2..462add2
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
index 0000000,bd58eea..bd58eea
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
index 0000000,d6f814f..d6f814f
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
index 0000000,0911e20..0911e20
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
index 0000000,b9b115e..b9b115e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
index 0000000,1c08fb2..1c08fb2
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
index 0000000,3dd25c0..3dd25c0
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
index 0000000,3145033..3145033
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
index 0000000,6a3f685..6a3f685
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
index 0000000,7c43137..7c43137
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
index 0000000,074faab..074faab
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
index 0000000,6ce23f6..6ce23f6
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
index 0000000,a8e9b0a..a8e9b0a
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
index 0000000,a8adc74..a8adc74
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
index 0000000,cac44ed..cac44ed
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
index 0000000,c8f69e6..c8f69e6
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
index 0000000,01054e0..01054e0
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
index 0000000,902b797..902b797
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
index 0000000,0506469..0506469
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
index 0000000,8d5b2bb..8d5b2bb
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
index 0000000,1eab587..1eab587
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
index 0000000,b110a19..b110a19
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
index 0000000,01d8d1a..01d8d1a
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
index 0000000,245dfd6..245dfd6
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
index 0000000,2257682..2257682
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
index 0000000,eaa6dad..eaa6dad
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
index 0000000,2bd7a36..2bd7a36
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
index 0000000,c812a53..c812a53
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
index 0000000,82d61a8..82d61a8
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
index 0000000,1c3f2c1..1c3f2c1
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
index 0000000,8a8ddf4..8a8ddf4
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
index 0000000,f0eac2b..f0eac2b
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
index 0000000,68395d3..68395d3
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
diff --cc connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
index 0000000,70a6773..70a6773
mode 000000,100644..100644
--- a/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
+++ b/connector/rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java

[rocketmq-connect] 05/05: Add 'connector/rocketmq-connect-hudi/' from commit '5da4b78705108ac6d260283cd38f9be08d2590b9'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 386adcc43bf32d201d8ea21ba9abfc653ef4ad17
Merge: fafa276 5da4b78
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:18:32 2022 +0800

    Add 'connector/rocketmq-connect-hudi/' from commit '5da4b78705108ac6d260283cd38f9be08d2590b9'
    
    git-subtree-dir: connector/rocketmq-connect-hudi
    git-subtree-mainline: fafa276619b0b6291f0567465590adee55619c8c
    git-subtree-split: 5da4b78705108ac6d260283cd38f9be08d2590b9

 connector/rocketmq-connect-hudi/README.md          |  77 ++++++
 connector/rocketmq-connect-hudi/pom.xml            | 287 +++++++++++++++++++++
 .../rocketmq/connect/hudi/config/CloneUtils.java   |  50 ++++
 .../rocketmq/connect/hudi/config/ConfigUtil.java   |  70 +++++
 .../connect/hudi/config/HudiConnectConfig.java     | 173 +++++++++++++
 .../connect/hudi/config/SinkConnectConfig.java     | 139 ++++++++++
 .../apache/rocketmq/connect/hudi/config/Utils.java |  75 ++++++
 .../connect/hudi/connector/HudiSinkConnector.java  | 250 ++++++++++++++++++
 .../connect/hudi/connector/HudiSinkTask.java       | 111 ++++++++
 .../apache/rocketmq/connect/hudi/sink/Updater.java | 239 +++++++++++++++++
 .../connect/hudi/strategy/ITaskDivideStrategy.java |  27 ++
 .../hudi/strategy/TaskDivideByQueueStrategy.java   |  80 ++++++
 .../hudi/strategy/TaskDivideStrategyFactory.java   |  25 ++
 .../rocketmq-connect-hudi/style/rmq_checkstyle.xml | 135 ++++++++++
 14 files changed, 1738 insertions(+)

diff --cc connector/rocketmq-connect-hudi/README.md
index 0000000,cadc364..cadc364
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/README.md
+++ b/connector/rocketmq-connect-hudi/README.md
diff --cc connector/rocketmq-connect-hudi/pom.xml
index 0000000,97c8785..97c8785
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/pom.xml
+++ b/connector/rocketmq-connect-hudi/pom.xml
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
index 0000000,dc3605d..dc3605d
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
index 0000000,88f8a8e..88f8a8e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
index 0000000,4c04605..4c04605
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
index 0000000,943df40..943df40
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
index 0000000,d9bc6fe..d9bc6fe
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
index 0000000,a496418..a496418
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
index 0000000,b01c660..b01c660
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
index 0000000,8e7e288..8e7e288
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
index 0000000,a91c066..a91c066
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
index 0000000,c68e17c..c68e17c
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
diff --cc connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
index 0000000,1d693a8..1d693a8
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
+++ b/connector/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
diff --cc connector/rocketmq-connect-hudi/style/rmq_checkstyle.xml
index 0000000,776b305..776b305
mode 000000,100644..100644
--- a/connector/rocketmq-connect-hudi/style/rmq_checkstyle.xml
+++ b/connector/rocketmq-connect-hudi/style/rmq_checkstyle.xml

[rocketmq-connect] 02/05: [ISSUE #801]Rocketmq connector sink for hudi (#800)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 44bb9bd16d11575f881ade0793d858c12ce0911f
Author: lizhiboo <li...@yeah.net>
AuthorDate: Thu Sep 9 12:52:18 2021 +0800

    [ISSUE #801]Rocketmq connector sink for hudi (#800)
    
    * support rocketmq sink to hudi
    
    * support rocketmq sink to hudi debug
    
    * remove unused code
    
    * support task divide
    
    * support divide strategy by topic queue
    
    * support divide strategy by topic queue
    
    * add log4j.properties
    
    * upgrade javalin to 2.8.0
    
    * add log
    
    * add log
    
    * add log
    
    * add quick stat in READMQ.md
    
    * support start hudi sink by spark-submit
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
    
    * code style
---
 README.md                                          |  78 ++++++
 pom.xml                                            | 287 +++++++++++++++++++++
 .../rocketmq/connect/hudi/config/CloneUtils.java   |  50 ++++
 .../rocketmq/connect/hudi/config/ConfigUtil.java   |  70 +++++
 .../connect/hudi/config/HudiConnectConfig.java     | 173 +++++++++++++
 .../connect/hudi/config/SinkConnectConfig.java     | 139 ++++++++++
 .../apache/rocketmq/connect/hudi/config/Utils.java |  75 ++++++
 .../connect/hudi/connector/HudiSinkConnector.java  | 250 ++++++++++++++++++
 .../connect/hudi/connector/HudiSinkTask.java       | 111 ++++++++
 .../apache/rocketmq/connect/hudi/sink/Updater.java | 239 +++++++++++++++++
 .../connect/hudi/strategy/ITaskDivideStrategy.java |  27 ++
 .../hudi/strategy/TaskDivideByQueueStrategy.java   |  80 ++++++
 .../hudi/strategy/TaskDivideStrategyFactory.java   |  25 ++
 style/rmq_checkstyle.xml                           | 135 ++++++++++
 14 files changed, 1739 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..509f3b2
--- /dev/null
+++ b/README.md
@@ -0,0 +1,78 @@
+# rocketmq-connect-hudi
+
+## rocketmq-connect-hudi 打包
+```
+mvn clean install -DskipTest -U 
+```
+将target目录下打包的rocketmq-connect-hudi-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到connector-runtime connect.conf配置的connector-plugin目录下。
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-hudi 使用的是0.8.0版本的hudi.
+
+## rocketmq-connect-hudi 启动
+
+首先,需要启动connect-runtime,参考rocketmq-connect-runtime的run_work.sh脚本。
+* **hudi-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-sink-connector-name}
+?config='{"connector-class":"org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector","topicNames":"topicc","tablePath":"file:///tmp/hudi_connector_test","tableName":"hudi_connector_test_table","insertShuffleParallelism":"2","upsertShuffleParallelism":"2","deleteParallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.RocketMQConverter","source-rocketmq":"127.0.0.1:9876","src-cluster":"DefaultCluster","refresh-interval":"10000","schemaPath":"/Users/o [...]
+```
+启动成功会打印如下日志:
+```
+2021-09-06 16:23:14 INFO pool-2-thread-1 - Open HoodieJavaWriteClient successfully
+```
+>**注:** `rocketmq-hudi-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-hudi 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/stop
+```
+
+## rocketmq-connect-hudi 参数说明
+* **hudi-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|connector-class | String | 是 | sink connector类 | HudiSinkConnector|
+|tablePath | String | 是 | sink到hudi的表路径 | file:///tmp/hudi_connector_test |
+|tableName | String | 是 | sink到hudi的表名称| hudi_connector_test_table |
+|insertShuffleParallelism | int | 是 | hudi insert并发度 | 2 |
+|upsertShuffleParallelism | int | 是 | hudi upsert并发度 | 2 |
+|deleteParallelism | int | 是 | hudi delete并发度 | 2 |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_hudi |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.RocketMQConverter |
+|src-cluster | String | 否 | 源集群 | DefaultCluster |
+|refresh-interval | String | 否 | sink的刷新时间,单位ms | 10000 |
+|schemaPath | String | 是 | sink的schema地址 | /Users/osgoo/Downloads/user.avsc" |
+
+
+示例配置如下
+```js
+{
+	"connector-class": "org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector",
+	"topicNames": "topicc",
+	"tablePath": "file:///tmp/hudi_connector_test",
+	"tableName": "hudi_connector_test_table",
+	"insertShuffleParallelism": "2",
+	"upsertShuffleParallelism": "2",
+	"deleteParallelism": "2",
+	"source-record-converter": "org.apache.rocketmq.connect.runtime.converter.RocketMQConverter",
+	"source-rocketmq": "127.0.0.1:9876",
+	"src-cluster": "DefaultCluster",
+	"refresh-interval": "10000",
+	"schemaPath": "/Users/osgoo/Downloads/user.avsc"
+}
+```
+
+* **spark-submit 启动任务**
+将connect-runtime打包后通过spark-submit提交任务
+```
+nohup sh spark-submit 	--class org.apache.rocketmq.connect.runtime.ConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties  --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apa [...]
+```
+后续操作参考rocketmq-connect-hudi启动步骤
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..97c8785
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,287 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-hudi</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.5.2</rocketmq.version>
+
+        <hudi.version>0.8.0</hudi.version>
+        <avro.version>1.10.2</avro.version>
+        <parquet.version>1.10.1</parquet.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.hudi.connector.HudiSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>0.3.1-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <version>${hudi.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+
+        <!-- used for spark-submit -->
+        <dependency>
+            <groupId>org.pentaho</groupId>
+            <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+            <version>5.1.5-jhyde</version>
+        </dependency>
+        <dependency>
+            <groupId>asm</groupId>
+            <artifactId>asm</artifactId>
+            <version>3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>2.3.7</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>failureaccess</artifactId>
+            <version>1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>3.3.1</version>
+        </dependency>
+
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
new file mode 100644
index 0000000..dc3605d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/CloneUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.ObjectInputStream;
+
+public class CloneUtils {
+    private static final Logger log = LoggerFactory.getLogger(CloneUtils.class);
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Serializable> T clone(T obj) {
+        T clonedObj = null;
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(obj);
+            oos.close();
+
+            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            clonedObj = (T) ois.readObject();
+            ois.close();
+        } catch (Exception e) {
+            log.error("Clone occur exception", e);
+        }
+        return clonedObj;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
new file mode 100644
index 0000000..88f8a8e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
new file mode 100644
index 0000000..4c04605
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/HudiConnectConfig.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.apache.avro.Schema;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+public class HudiConnectConfig {
+
+    protected String tableType = "COPY_ON_WRITE";
+
+    protected String tablePath;
+
+    protected String tableName;
+
+    protected int insertShuffleParallelism = 2;
+
+    protected int upsertShuffleParallelism = 2;
+
+    protected int deleteParallelism = 2;
+
+    protected String srcRecordConverter;
+
+    protected String topicNames;
+
+    protected String indexType = "INMEMORY";
+
+    protected String schemaPath;
+
+    public Schema schema;
+
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+    public static final String CONN_WHITE_LIST = "whiteDataBase";
+    public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+
+    public static final String CONN_HUDI_TABLE_TYPE = "tableType";
+    public static final String CONN_HUDI_TABLE_PATH = "tablePath";
+    public static final String CONN_HUDI_TABLE_NAME = "tableName";
+    public static final String CONN_HUDI_INSERT_SHUFFLE_PARALLELISM = "insertShuffleParallelism";
+    public static final String CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM = "upsertShuffleParallelism";
+    public static final String CONN_HUDI_DELETE_PARALLELISM = "deleteParallelism";
+
+    public static final String CONN_TOPIC_NAMES = "topicNames";
+    public static final String CONN_TOPIC_QUEUES = "topicQueues";
+    public static final String CONN_SCHEMA_PATH = "schemaPath";
+
+    public static final String CONN_TOPIC_ROUTE_INFO = "topicRouterInfo";
+
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add(CONN_HUDI_TABLE_PATH);
+            add(CONN_HUDI_TABLE_NAME);
+            add(CONN_HUDI_INSERT_SHUFFLE_PARALLELISM);
+            add(CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM);
+            add(CONN_HUDI_DELETE_PARALLELISM);
+            add(CONN_SOURCE_RECORD_CONVERTER);
+            add(CONN_TOPIC_NAMES);
+            add(CONN_SCHEMA_PATH);
+        }
+    };
+
+    public String getTableType() {
+        return tableType;
+    }
+
+    public void setTableType(String tableType) {
+        this.tableType = tableType;
+    }
+
+    public String getTablePath() {
+        return tablePath;
+    }
+
+    public void setTablePath(String tablePath) {
+        this.tablePath = tablePath;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public int getInsertShuffleParallelism() {
+        return insertShuffleParallelism;
+    }
+
+    public void setInsertShuffleParallelism(int insertShuffleParallelism) {
+        this.insertShuffleParallelism = insertShuffleParallelism;
+    }
+
+    public int getUpsertShuffleParallelism() {
+        return upsertShuffleParallelism;
+    }
+
+    public void setUpsertShuffleParallelism(int upsertShuffleParallelism) {
+        this.upsertShuffleParallelism = upsertShuffleParallelism;
+    }
+
+    public int getDeleteParallelism() {
+        return deleteParallelism;
+    }
+
+    public void setDeleteParallelism(int deleteParallelism) {
+        this.deleteParallelism = deleteParallelism;
+    }
+
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
+    }
+
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
+    }
+
+    public String getTopicNames() {
+        return topicNames;
+    }
+
+    public void setTopicNames(String topicNames) {
+        this.topicNames = topicNames;
+    }
+
+    public String getIndexType() {
+        return indexType;
+    }
+
+    public void setIndexType(String indexType) {
+        this.indexType = indexType;
+    }
+
+    public String getSchemaPath() {
+        return schemaPath;
+    }
+
+    public void setSchemaPath(String schemaPath) {
+        this.schemaPath = schemaPath;
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public void setSchema(Schema schema) {
+        this.schema = schema;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
new file mode 100644
index 0000000..943df40
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/SinkConnectConfig.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkConnectConfig extends HudiConnectConfig {
+    private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String srcCluster;
+    private long refreshInterval;
+    private Map<String, Set<MessageQueue>> topicRouteMap;
+    public int taskParallelism;
+    private String taskDivideStrategy;
+
+    public SinkConnectConfig(){
+    }
+
+    public void validate(KeyValue config) {
+        buildWhiteList(config);
+        this.tablePath = config.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH);
+        this.tableName = config.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME);
+        this.insertShuffleParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM);
+        this.deleteParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM);
+        this.upsertShuffleParallelism = config.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM);
+        this.setSrcRecordConverter(config.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER));
+        this.setTopicNames(config.getString(HudiConnectConfig.CONN_TOPIC_NAMES));
+        this.setSchemaPath(config.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+
+        this.srcNamesrvs = config.getString(HudiConnectConfig.CONN_SOURCE_RMQ);
+        this.srcCluster = config.getString(HudiConnectConfig.CONN_SOURCE_CLUSTER);
+        this.refreshInterval = config.getLong(HudiConnectConfig.REFRESH_INTERVAL, 3);
+
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<>();
+        String whiteListStr = config.getString(HudiConnectConfig.CONN_TOPIC_NAMES, "");
+        String[] wl = whiteListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+
+    public Set<String> getWhiteList() {
+        return whiteList;
+    }
+
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public Map<String, Set<MessageQueue>> getTopicRouteMap() {
+        return topicRouteMap;
+    }
+
+    public void setTopicRouteMap(Map<String, Set<MessageQueue>> topicRouteMap) {
+        this.topicRouteMap = topicRouteMap;
+    }
+
+    public Set<String> getWhiteTopics() {
+        return getWhiteList();
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getTaskDivideStrategy() {
+        return taskDivideStrategy;
+    }
+
+    public void setTaskDivideStrategy(String taskDivideStrategy) {
+        this.taskDivideStrategy = taskDivideStrategy;
+    }
+
+    @Override
+    public String toString() {
+        return "SinkConnectConfig{" +
+                "whiteList=" + whiteList +
+                ", srcNamesrvs='" + srcNamesrvs + '\'' +
+                ", srcCluster='" + srcCluster + '\'' +
+                ", refreshInterval=" + refreshInterval +
+                ", topicRouteMap=" + topicRouteMap +
+                ", tableType='" + tableType + '\'' +
+                ", tablePath='" + tablePath + '\'' +
+                ", tableName='" + tableName + '\'' +
+                ", insertShuffleParallelism=" + insertShuffleParallelism +
+                ", upsertShuffleParallelism=" + upsertShuffleParallelism +
+                ", deleteParallelism=" + deleteParallelism +
+                ", indexType='" + indexType + '\'' +
+                ", schemaPath='" + schemaPath + '\'' +
+                ", schema=" + schema +
+                '}';
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
new file mode 100644
index 0000000..d9bc6fe
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/config/Utils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.config;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createGroupName(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<>();
+        for (String ns : namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<>();
+
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) {
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
new file mode 100644
index 0000000..a496418
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkConnector.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.connector;
+
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.SinkConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.CloneUtils;
+import org.apache.rocketmq.connect.hudi.config.Utils;
+import org.apache.rocketmq.connect.hudi.strategy.ITaskDivideStrategy;
+import org.apache.rocketmq.connect.hudi.strategy.TaskDivideStrategyFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+
+public class HudiSinkConnector extends SinkConnector {
+    private static final Logger log = LoggerFactory.getLogger(HudiSinkConnector.class);
+    private volatile boolean configValid = false;
+    private ScheduledExecutorService executor;
+    private HashMap<String, Set<MessageQueue>> topicRouteMap;
+
+    private DefaultMQAdminExt srcMQAdminExt;
+    private SinkConnectConfig sinkConnectConfig;
+
+    private volatile boolean adminStarted;
+
+    private ScheduledFuture<?> listenerHandle;
+    public static final String HUDI_CONNECTOR_ADMIN_PREFIX = "HUDI-CONNECTOR-ADMIN";
+    public static final String PREFIX = "hudi";
+
+    public HudiSinkConnector() {
+        topicRouteMap = new HashMap<>();
+        sinkConnectConfig = new SinkConnectConfig();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("HudiFSinkConnector-SinkWatcher-%d").daemon(true).build());
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(this.sinkConnectConfig.getSrcNamesrvs());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(HUDI_CONNECTOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.sinkConnectConfig.getSrcNamesrvs()));
+
+        try {
+            log.info("Trying to start srcMQAdminExt");
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+        } catch (MQClientException e) {
+            log.error("Hudi Sink Task start failed for `srcMQAdminExt` exception.", e);
+        }
+
+        adminStarted = true;
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : HudiConnectConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.sinkConnectConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+        startMQAdminTools();
+        startListener();
+    }
+
+    public void startListener() {
+        listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+            boolean first = true;
+            HashMap<String, Set<MessageQueue>> origin = null;
+
+            @Override
+            public void run() {
+                buildRoute();
+                if (first) {
+                    origin = CloneUtils.clone(topicRouteMap);
+                    first = false;
+                }
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                    origin = CloneUtils.clone(topicRouteMap);
+                }
+            }
+        }, sinkConnectConfig.getRefreshInterval(), sinkConnectConfig.getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, Set<MessageQueue>> origin, Map<String, Set<MessageQueue>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, Set<MessageQueue>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            Set<MessageQueue> originTasks = entry.getValue();
+            Set<MessageQueue> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void buildRoute() {
+        String srcCluster = this.sinkConnectConfig.getSrcCluster();
+        try {
+            for (String topic : this.sinkConnectConfig.getWhiteList()) {
+
+                // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                Set<String> brokerNameSet = new HashSet<String>();
+                for (BrokerData b : brokerList) {
+                    brokerNameSet.add(b.getBrokerName());
+                }
+
+                TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+                if (!topicRouteMap.containsKey(topic)) {
+                    topicRouteMap.put(topic, new HashSet<>(16));
+                }
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    if (brokerNameSet.contains(qd.getBrokerName())) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            MessageQueue taskTopicInfo = new MessageQueue(topic, qd.getBrokerName(), i);
+                            topicRouteMap.get(topic).add(taskTopicInfo);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            srcMQAdminExt.shutdown();
+        }
+    }
+
+
+    /**
+     * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why
+     * it can be applied to srcMQAdminExt
+     */
+    @Override
+    public void stop() {
+        listenerHandle.cancel(true);
+        srcMQAdminExt.shutdown();
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return HudiSinkTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+        startMQAdminTools();
+        buildRoute();
+        DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, sinkConnectConfig.getTablePath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, sinkConnectConfig.getTableName());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, sinkConnectConfig.getInsertShuffleParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, sinkConnectConfig.getUpsertShuffleParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, sinkConnectConfig.getDeleteParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, sinkConnectConfig.getSrcRecordConverter());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_NAMES, sinkConnectConfig.getTopicNames());
+        defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, sinkConnectConfig.getSchemaPath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, sinkConnectConfig.getTaskParallelism());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TASK_DIVIDE_STRATEGY, sinkConnectConfig.getTaskDivideStrategy());
+        defaultKeyValue.put(HudiConnectConfig.CONN_WHITE_LIST, JSONObject.toJSONString(sinkConnectConfig.getWhiteList()));
+        defaultKeyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, sinkConnectConfig.getSchemaPath());
+        defaultKeyValue.put(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO, JSONObject.toJSONString(sinkConnectConfig.getTopicRouteMap()));
+        log.info("taskConfig : " + defaultKeyValue + ", sinkConnectConfig : " + sinkConnectConfig);
+        ITaskDivideStrategy strategy = TaskDivideStrategyFactory.getInstance();
+        List<KeyValue> taskConfigs = strategy.divide(defaultKeyValue);
+        return taskConfigs;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
new file mode 100644
index 0000000..b01c660
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.apache.rocketmq.connect.hudi.config.ConfigUtil;
+import org.apache.rocketmq.connect.hudi.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for "columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with SQL databases
+ */
+public class HudiSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(HudiSinkTask.class);
+
+    private HudiConnectConfig hudiConnectConfig;
+    private Updater updater;
+
+    public HudiSinkTask() {
+        this.hudiConnectConfig = new HudiConnectConfig();
+    }
+
+    @Override
+    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+        try {
+            log.info("Hudi Sink Task trying to put()");
+            for (SinkDataEntry record : sinkDataEntries) {
+                log.info("Hudi Sink Task trying to call updater.push()");
+                Boolean isSuccess = updater.push(record);
+                if (!isSuccess) {
+                    log.error("Hudi sink push data error, record:{}", record);
+                }
+                log.debug("Hudi pushed data : " + record);
+            }
+        } catch (Exception e) {
+            log.error("put sinkDataEntries error, {}", e);
+        }
+    }
+
+    @Override
+    public void commit(Map<QueueMetaData, Long> map) {
+
+    }
+
+    /**
+     * Remember always close the CqlSession according to
+     * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+     * @param props
+     */
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.hudiConnectConfig);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Hudi Sink Task because of configuration error{}", e);
+        }
+        try {
+            updater = new Updater(hudiConnectConfig);
+            updater.start();
+        } catch (Throwable e) {
+            log.error("fail to start updater{}", e);
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            updater.stop();
+            log.info("hudi sink task connection is closed.");
+        } catch (Throwable e) {
+            log.warn("sink task stop error while closing connection to {}", "hudi", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
new file mode 100644
index 0000000..8e7e288
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.hudi.sink;
+
+
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.GenericDataSupplier;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class Updater {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private HudiConnectConfig hudiConnectConfig;
+    private HoodieJavaWriteClient hudiWriteClient;
+    private HoodieWriteConfig cfg;
+    private transient ScheduledExecutorService scheduledExecutor;
+    private int flushIntervalMs = 3000;
+    private int batchSize = 100;
+    private List<SinkDataEntry> inflightList;
+    private Object batchLocker = new Object();
+
+
+    public Updater(HudiConnectConfig hudiConnectConfig) throws Exception {
+        this.hudiConnectConfig = hudiConnectConfig;
+
+        try {
+            File schemaFile = new File(hudiConnectConfig.getSchemaPath());
+            this.hudiConnectConfig.schema = new Schema.Parser().parse(schemaFile);
+            log.info("Hudi schema : " + this.hudiConnectConfig.schema.toString());
+        } catch (IOException e) {
+            throw new Exception(String.format("Failed to find schema file %s", hudiConnectConfig.getSchemaPath()), e);
+        }
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+        hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName());
+        hadoopConf.setClassLoader(this.getClass().getClassLoader());
+        hadoopConf.set("fs.hdfs.impl",
+                DistributedFileSystem.class.getName()
+        );
+        hadoopConf.set("fs.file.impl",
+                LocalFileSystem.class.getName()
+        );
+
+        // fs.%s.impl.disable.cache
+        hadoopConf.set("fs.file.impl.disable.cache", String.valueOf(true));
+        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+
+        Path path = new Path(hudiConnectConfig.getTablePath());
+        FileSystem fs = FSUtils.getFs(hudiConnectConfig.getTablePath(), hadoopConf);
+        if (!fs.exists(path)) {
+            HoodieTableMetaClient.withPropertyBuilder()
+                    .setTableType(hudiConnectConfig.getTableType())
+                    .setTableName(hudiConnectConfig.getTableName())
+                    .setPayloadClassName(HoodieAvroPayload.class.getName())
+                    .initTable(hadoopConf, hudiConnectConfig.getTablePath());
+        }
+        log.info("Hudi inited table");
+
+        this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
+                .withSchema(this.hudiConnectConfig.schema.toString())
+                .withEngineType(EngineType.JAVA)
+                .withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism())
+                .withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
+                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+        cfg.getAvroSchemaValidate();
+        this.hudiWriteClient =
+                new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg);
+        log.info("Open HoodieJavaWriteClient successfully");
+
+        inflightList = new ArrayList<>();
+        if (batchSize > 0) {
+            scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+            scheduledExecutor.scheduleAtFixedRate(
+                () -> {
+                    try {
+                        commit();
+                    } catch (Exception e) {
+                        log.error("Flush error when executed at fixed rate", e);
+                    }
+                }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private GenericRecord sinkDataEntry2GenericRecord(SinkDataEntry record) {
+        byte[] recordBytes = (byte[]) record.getPayload()[0];
+        GenericRecord genericRecord = new GenericData.Record(this.hudiConnectConfig.schema);
+        DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<GenericRecord>(this.hudiConnectConfig.schema);
+        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);
+        try {
+            if (!decoder.isEnd()) {
+                genericRecord = userDatumReader.read(genericRecord, decoder);
+            }
+        } catch (IOException e) {
+            log.error("SinkDataEntry convert to GenericRecord occur error,", e);
+        }
+        return genericRecord;
+    }
+
+    public boolean push(SinkDataEntry record) {
+        log.info("Updater Trying to push data");
+        Boolean isSuccess = true;
+        if (record == null) {
+            log.warn("Updater push sinkDataRecord null.");
+            return true;
+        }
+        synchronized (batchLocker) {
+            inflightList.add(record);
+        }
+        if (inflightList.size() >= batchSize) {
+            try {
+                scheduledExecutor.submit(this::commit);
+            } catch (Exception e) {
+                log.error("Updater commmit occur error", e);
+                isSuccess = false;
+            }
+        }
+        return isSuccess;
+    }
+
+    private void schemaEvolution(Schema newSchema, Schema oldSchema) {
+        if (null != oldSchema && oldSchema.toString().equals(newSchema.toString())) {
+            return;
+        }
+        log.info("Schema changed. New schema is " + newSchema.toString());
+        this.cfg = HoodieWriteConfig.newBuilder().withPath(hudiConnectConfig.getTablePath())
+                .withSchema(this.hudiConnectConfig.schema.toString())
+                .withEngineType(EngineType.JAVA)
+                .withParallelism(hudiConnectConfig.getInsertShuffleParallelism(), hudiConnectConfig.getUpsertShuffleParallelism())
+                .withDeleteParallelism(hudiConnectConfig.getDeleteParallelism()).forTable(hudiConnectConfig.getTableName())
+                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+        this.hudiWriteClient.close();
+        Configuration hadoopConf = new Configuration();
+        hadoopConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+        hadoopConf.set(AvroReadSupport.AVRO_DATA_SUPPLIER, GenericDataSupplier.class.getName());
+        this.hudiWriteClient =
+                new HoodieJavaWriteClient<HoodieAvroPayload>(new HoodieJavaEngineContext(hadoopConf), cfg);
+    }
+
+    public void commit() {
+        List<SinkDataEntry> commitList;
+        if (inflightList.isEmpty()) {
+            return;
+        }
+        synchronized (this.inflightList) {
+            commitList = inflightList;
+            inflightList = new ArrayList<>();
+        }
+        List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
+        for (SinkDataEntry record : commitList) {
+            GenericRecord genericRecord = sinkDataEntry2GenericRecord(record);
+            HoodieRecord<HoodieAvroPayload> hoodieRecord = new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "shardingKey-" + record.getQueueName()), new HoodieAvroPayload(Option.of(genericRecord)));
+            hoodieRecordsList.add(hoodieRecord);
+        }
+        try {
+            List<WriteStatus> statuses = hudiWriteClient.upsert(hoodieRecordsList, hudiWriteClient.startCommit());
+            log.info("Upserted data to hudi");
+            long upserted = statuses.get(0).getStat().getNumInserts();
+            if (upserted != commitList.size()) {
+                log.warn("Upserted num not equals input");
+            }
+        } catch (Exception e) {
+            log.error("Exception when upserting to Hudi", e);
+        }
+    }
+
+    public void start() throws Exception {
+        log.info("schema load success");
+    }
+
+    public void stop() {
+        this.hudiWriteClient.close();
+        log.info("Hudi sink updater stopped.");
+    }
+
+    public HudiConnectConfig getHudiConnectConfig() {
+        return hudiConnectConfig;
+    }
+
+    public void setHudiConnectConfig(HudiConnectConfig hudiConnectConfig) {
+        this.hudiConnectConfig = hudiConnectConfig;
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
new file mode 100644
index 0000000..a91c066
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/ITaskDivideStrategy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+import io.openmessaging.KeyValue;
+
+import java.util.List;
+
+
+public interface ITaskDivideStrategy {
+    List<KeyValue> divide(KeyValue source);
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
new file mode 100644
index 0000000..c68e17c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideByQueueStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
+
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+
+public class TaskDivideByQueueStrategy implements ITaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(KeyValue source) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM);
+        Map<String, MessageQueue> topicRouteInfos = (Map<String, MessageQueue>) JSONObject.parse(source.getString(HudiConnectConfig.CONN_TOPIC_ROUTE_INFO));
+        int id = 0;
+        List<List<String>> taskTopicQueues = new ArrayList<>(parallelism);
+        for (Map.Entry<String, MessageQueue> topicQueue : topicRouteInfos.entrySet()) {
+            MessageQueue messageQueue = topicQueue.getValue();
+            String topicQueueStr = messageQueue.getTopic() + "," + messageQueue.getBrokerName() + "," + messageQueue.getQueueId();
+            int ind = ++id % parallelism;
+            if (taskTopicQueues.get(ind) != null) {
+                List<String> taskTopicQueue = new LinkedList<>();
+                taskTopicQueue.add(topicQueueStr);
+                taskTopicQueues.add(ind, taskTopicQueue);
+            } else {
+                List<String> taskTopicQueue = taskTopicQueues.get(ind);
+                taskTopicQueue.add(topicQueueStr);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            // build single task queue config; format is topicName1,brokerName1,queueId1;topicName1,brokerName1,queueId2
+            String singleTaskTopicQueueStr = "";
+            List<String> singleTaskTopicQueues = taskTopicQueues.get(i);
+            for (String singleTopicQueue : singleTaskTopicQueues) {
+                singleTaskTopicQueueStr += singleTopicQueue + ";";
+            }
+            singleTaskTopicQueueStr = singleTaskTopicQueueStr.substring(0, singleTaskTopicQueueStr.length() - 1);
+            // fill connect config;
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(HudiConnectConfig.CONN_TOPIC_QUEUES, singleTaskTopicQueueStr);
+            keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_PATH, source.getString(HudiConnectConfig.CONN_HUDI_TABLE_PATH));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_TABLE_NAME, source.getString(HudiConnectConfig.CONN_HUDI_TABLE_NAME));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_INSERT_SHUFFLE_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_UPSERT_SHUFFLE_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM, source.getInt(HudiConnectConfig.CONN_HUDI_DELETE_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER, source.getString(HudiConnectConfig.CONN_SOURCE_RECORD_CONVERTER));
+            keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, source.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+            keyValue.put(HudiConnectConfig.CONN_TASK_PARALLELISM, source.getInt(HudiConnectConfig.CONN_TASK_PARALLELISM));
+            keyValue.put(HudiConnectConfig.CONN_SCHEMA_PATH, source.getString(HudiConnectConfig.CONN_SCHEMA_PATH));
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
new file mode 100644
index 0000000..1d693a8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/hudi/strategy/TaskDivideStrategyFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hudi.strategy;
+
+
+public class TaskDivideStrategyFactory {
+    public static ITaskDivideStrategy getInstance() {
+        return new TaskDivideByQueueStrategy();
+    }
+}
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..776b305
--- /dev/null
+++ b/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

[rocketmq-connect] 03/05: fix hudi connect config

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 5da4b78705108ac6d260283cd38f9be08d2590b9
Author: duhenglucky <du...@apache.org>
AuthorDate: Sun Dec 5 21:26:51 2021 +0800

    fix hudi connect config
---
 README.md | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/README.md b/README.md
index 509f3b2..cadc364 100644
--- a/README.md
+++ b/README.md
@@ -44,10 +44,9 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
 |topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_hudi |
 |task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
 |task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
-|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
-|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-cluster | String | 是 | sink 端 RocketMQ cluster 名称 | DefaultCluster |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的 RocketMQ nameserver 地址 | 127.0.0.1:9876 |
 |source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.RocketMQConverter |
-|src-cluster | String | 否 | 源集群 | DefaultCluster |
 |refresh-interval | String | 否 | sink的刷新时间,单位ms | 10000 |
 |schemaPath | String | 是 | sink的schema地址 | /Users/osgoo/Downloads/user.avsc" |
 
@@ -64,7 +63,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
 	"deleteParallelism": "2",
 	"source-record-converter": "org.apache.rocketmq.connect.runtime.converter.RocketMQConverter",
 	"source-rocketmq": "127.0.0.1:9876",
-	"src-cluster": "DefaultCluster",
+	"source-cluster": "DefaultCluster",
 	"refresh-interval": "10000",
 	"schemaPath": "/Users/osgoo/Downloads/user.avsc"
 }