You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/01 14:02:09 UTC

[rocketmq-connect] 01/02: [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e2cc843ef4926a98797ca76880579941d5363fc6
Author: affe <af...@gmail.com>
AuthorDate: Tue Jul 28 11:41:32 2020 +0800

    [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
    
    * feat(connect-cassanra) merge affe's cassandra connector implementation
    
    * Merge master into current branch : deleted .iml in connect-jms
    
    * style(connect-cassandra): resolve all TODOs and meaningless comments
---
 README.md                                          |  97 +++++++
 pom.xml                                            | 276 ++++++++++++++++++++
 scripts/gen_data.py                                | 155 +++++++++++
 scripts/requirements.txt                           |   2 +
 .../connect/cassandra/common/CloneUtils.java       |  44 ++++
 .../connect/cassandra/common/ConstDefine.java      |  23 ++
 .../rocketmq/connect/cassandra/common/DBUtils.java |  91 +++++++
 .../connect/cassandra/common/DataType.java         |  26 ++
 .../rocketmq/connect/cassandra/common/Utils.java   |  76 ++++++
 .../rocketmq/connect/cassandra/config/Config.java  | 282 +++++++++++++++++++++
 .../connect/cassandra/config/ConfigUtil.java       |  70 +++++
 .../cassandra/config/DbConnectorConfig.java        | 110 ++++++++
 .../cassandra/config/SinkDbConnectorConfig.java    | 112 ++++++++
 .../cassandra/config/SourceDbConnectorConfig.java  |  87 +++++++
 .../connect/cassandra/config/TaskDivideConfig.java | 123 +++++++++
 .../connect/cassandra/config/TaskTopicInfo.java    |  40 +++
 .../connector/CassandraSinkConnector.java          | 240 ++++++++++++++++++
 .../cassandra/connector/CassandraSinkTask.java     | 161 ++++++++++++
 .../connector/CassandraSourceConnector.java        | 108 ++++++++
 .../cassandra/connector/CassandraSourceTask.java   | 168 ++++++++++++
 .../connect/cassandra/schema/Database.java         | 140 ++++++++++
 .../rocketmq/connect/cassandra/schema/Schema.java  | 146 +++++++++++
 .../rocketmq/connect/cassandra/schema/Table.java   | 103 ++++++++
 .../schema/column/BigIntColumnParser.java          |  50 ++++
 .../schema/column/BooleanColumnParser.java         |  34 +++
 .../cassandra/schema/column/ColumnParser.java      | 118 +++++++++
 .../schema/column/DateTimeColumnParser.java        |  53 ++++
 .../schema/column/DefaultColumnParser.java         |  37 +++
 .../cassandra/schema/column/EnumColumnParser.java  |  46 ++++
 .../cassandra/schema/column/IntColumnParser.java   |  66 +++++
 .../cassandra/schema/column/SetColumnParser.java   |  54 ++++
 .../schema/column/StringColumnParser.java          |  57 +++++
 .../cassandra/schema/column/TimeColumnParser.java  |  39 +++
 .../cassandra/schema/column/YearColumnParser.java  |  40 +++
 .../rocketmq/connect/cassandra/sink/Updater.java   | 216 ++++++++++++++++
 .../rocketmq/connect/cassandra/source/Querier.java | 164 ++++++++++++
 .../cassandra/strategy/DivideStrategyEnum.java     |  23 ++
 .../cassandra/strategy/DivideTaskByTopic.java      | 110 ++++++++
 .../cassandra/strategy/TaskDivideStrategy.java     |  32 +++
 39 files changed, 3819 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6439269
--- /dev/null
+++ b/README.md
@@ -0,0 +1,97 @@
+# rocketmq-connect-cassandra
+
+## rocketmq-connect-cassandra 打包
+```
+mvn clean install -DskipTest -U 
+```
+
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取
+位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件[reference.conf](https://github.com/datastax/java-driver/blob/4.5.0/core/src/main/resources/reference.conf) 并将其放置于classpath中。
+
+该问题还仍然在解决的过程中。
+
+## rocketmq-connect-cassandra 启动
+
+* **cassandra-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+
+* **cassandra-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+>**注:** `rocketmq-cassandra-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-cassandra 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop
+```
+
+## rocketmq-connect-cassandra 参数说明
+* **cassandra-source-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | source端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | source端 DB port | 3306 |
+|dbUsername | String | 是 | source端 DB 用户名 | root |
+|dbPassword | String | 是 | source端 DB 密码 | 123456 |
+|rocketmqTopic | String | 是 | 待废弃的参数,需和topicNames相同 | jdbc_cassandra |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_cassandra |
+|whiteDataBase | String | 是 | source端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意 | {"DATABASE_TEST":{"TEST_DATA":{"name":"test"}}} |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|localDataCenter | String | 是 | 待废弃 | cassandra 集群的datacenter名称,为必填项 |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
+
+
+示例配置如下
+```js
+{
+    "connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
+    "rocketmqTopic":"jdbc_cassandra",
+    "topicNames": "jdbc_cassandra",
+    "dbUrl":"127.0.0.1",
+    "dbPort":"9042",
+    "dbUsername":"cassandra",
+    "dbPassword":"cassandra",
+    "localDataCenter":"datacenter1",
+    "whiteDataBase": {
+        "jdbc":{
+           "jdbc_cassandra": {"NO-FILTER": "10"}
+         }
+      },
+    "mode": "bulk",
+    "task-parallelism": 1,
+    "source-cluster": "172.17.0.1:10911",
+    "source-rocketmq": "127.0.0.1:9876",
+    "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+ }
+```
+* **cassandra-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | sink端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | sink端 DB port | 3306 |
+|dbUsername | String | 是 | sink端 DB 用户名 | root |
+|dbPassword | String | 是 | sink端 DB 密码 | 123456 |
+|topicNames | String | 是 | sink端同步数据的topic名字 | topic-1,topic-2 |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|~~rocketmqTopic~~ | String | 是 | 待废弃 | cassandraTopic |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..286a6ef
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,276 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-cassandra</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-cassandra</name>
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.5.2</rocketmq.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.cassandra.connector.CassandraSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.12</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>0.3.1-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.60</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+        <dependency>
+            <groupId>io.javalin</groupId>
+            <artifactId>javalin</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.31</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-core-shaded</artifactId>
+            <version>4.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-query-builder</artifactId>
+            <version>4.5.1</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/scripts/gen_data.py b/scripts/gen_data.py
new file mode 100644
index 0000000..8fde504
--- /dev/null
+++ b/scripts/gen_data.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import *
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, String
+from sqlalchemy.orm import sessionmaker
+import argparse
+import random
+import string
+import datetime
+import time
+import uuid
+
+
+
+Base = declarative_base()
+class JdbcCassandra(Base):
+    __tablename__ = 'jdbc_cassandra'
+    int_type = Column(INT, primary_key=True, autoincrement=True)
+    ascii_type = Column(VARCHAR(50))
+    boolean_type = Column(BOOLEAN)
+    date_type = Column(DATE)
+    decimal_type = Column(DECIMAL)
+    double_type = Column(FLOAT)
+    float_type = Column(FLOAT)
+    inet_type = Column(VARCHAR(50))
+    smallint_type = Column(SMALLINT)
+    time_type  = Column(TIME)
+    text_type = Column(TEXT)
+    timestamp_type  = Column(TIMESTAMP)
+    #timeuuid_type  = Column(CHAR(36))
+    tinyint_type  = Column(SMALLINT)
+    #uuid_type  = Column(CHAR(36))
+    varchar_type  = Column(VARCHAR(50))
+    varint_type  = Column(BIGINT)
+
+
+
+def random_string(strlen=8):
+    letters = string.ascii_lowercase
+    return bytes(''.join(random.choice(letters) for i in range(strlen)), 'utf8') 
+
+
+def str_time_prop(start, end, format, prop):
+    """Get a time at a proportion of a range of two formatted times.
+
+    start and end should be strings specifying times formated in the
+    given format (strftime-style), giving an interval [start, end].
+    prop specifies how a proportion of the interval to be taken after
+    start.  The returned time will be in the specified format.
+    """
+
+    stime = time.mktime(time.strptime(start, format))
+    etime = time.mktime(time.strptime(end, format))
+
+    ptime = stime + prop * (etime - stime)
+
+    return time.strftime(format, time.localtime(ptime))
+
+
+def random_date(start, end, prop):
+    return bytes(str_time_prop(start, end, '%m/%d/%Y %I:%M %p', prop), 'utf8')
+
+def random_time(start, end, prop):
+    return bytes(str_time_prop(start, end, '%Y-%m-%d %H:%M:%S', prop), 'utf8')
+
+def format_time():
+    t = datetime.datetime.now()
+    s = t.strftime('%Y-%m-%d %H:%M:%S.%f')
+    return s[:-3]
+
+def main():
+    
+    # define parser
+    parser = argparse.ArgumentParser()
+    parser.add_argument('hostname', metavar='HOSTNAME', type=str,
+                        help='hostname of target mysql database ')
+    parser.add_argument('port', metavar='PORT', type=str,
+                        help='port of target mysql database')
+    parser.add_argument('username', metavar='USERNAME', type=str,
+                        help='username of the user connecting to mysql')
+    parser.add_argument('password', metavar='PASSWORD', type=str,
+                        help='password of specified user')   
+    parser.add_argument('database', metavar='DATABASE', type=str,
+                        help='which database to connect to') 
+    parser.add_argument('count', metavar='COUNT', type=int,
+                        help='how many random records to insert into the database')                                                           
+    args = parser.parse_args()
+
+
+    # get variabless from command line
+    hostname = args.hostname
+    port = args.port
+    username = args.username
+    password = args.password
+    database = args.database
+    count = args.count
+
+
+    # create db connection
+    # print("----------------ERROR-------------")
+    # print("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database))
+    engine = create_engine("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database), echo=True)
+
+
+    # create table if not exist
+    Base.metadata.create_all(engine)    
+
+    # create a session
+    Session = sessionmaker(bind=engine)
+    session = Session()
+
+    for i in range(0, count):
+        random_record = JdbcCassandra(
+            ascii_type = random_string(30),
+            boolean_type = (i % 2 == 0),
+            date_type = datetime.datetime.now().date(),
+            decimal_type = 10.5,
+            double_type = 1.5,
+            float_type = 8.3,
+            inet_type = "127.0.0.1",
+            smallint_type = 1,
+            time_type = datetime.datetime.now().time(),
+            text_type = random_string(30),
+            timestamp_type = datetime.datetime.now(),
+            #timeuuid_type = str(uuid.uuid1()),
+            tinyint_type = 1,
+            #uuid_type = str(uuid.uuid1()),
+            varchar_type = random_string(30),
+            varint_type = random.getrandbits(63),
+        )
+        session.add(random_record)
+
+    session.commit()
+
+
+if __name__ == "__main__":
+    main()
\ No newline at end of file
diff --git a/scripts/requirements.txt b/scripts/requirements.txt
new file mode 100644
index 0000000..fc7dc31
--- /dev/null
+++ b/scripts/requirements.txt
@@ -0,0 +1,2 @@
+SQLAlchemy==1.3.16
+PyMySQL==0.9.3
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
new file mode 100644
index 0000000..c860750
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+    @SuppressWarnings("unchecked")
+    public static <T extends Serializable> T clone(T obj) {
+        T clonedObj = null;
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(obj);
+            oos.close();
+
+            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+            ObjectInputStream ois = new ObjectInputStream(bais);
+            clonedObj = (T) ois.readObject();
+            ois.close();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return clonedObj;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
new file mode 100644
index 0000000..462add2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public class ConstDefine {
+
+    public static String CASSANDRA_CONNECTOR_ADMIN_PREFIX = "CASSANDRA-CONNECTOR-ADMIN";
+    public static final String PREFIX = "cassandra";
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
new file mode 100644
index 0000000..bd58eea
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
@@ -0,0 +1,91 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.connector.CassandraSinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+public class DBUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+    public static CqlSession initCqlSession(Config config) throws Exception {
+        log.info("Trying to init Cql Session ");
+        Map<String, String> map = new HashMap<>();
+
+        String dbUrl = config.getDbUrl();
+        String dbPort = config.getDbPort();
+        String localDataCenter = config.getLocalDataCenter();
+        String username =  config.getDbUsername();
+        String password =  config.getDbPassword();
+
+//        sessionBuilder.addContactPoint(new InetSocketAddress(dbUrl, Integer.parseInt(dbPort)))
+//                      .withAuthCredentials(username, password);
+
+
+        log.info("Cassandra dbUrl: {}", dbUrl);
+        log.info("Cassandra dbPort: {}", dbPort);
+        log.info("Cassandra datacenter: {}", localDataCenter);
+        log.info("Cassandra username: {}", username);
+        log.info("Cassandra password: {}", password);
+
+        CqlSession cqlSession = null;
+        log.info("Using Program Config Loader");
+        try {
+            ExecutorService executorService = Executors.newSingleThreadExecutor();
+            Future<CqlSession> handle = executorService.submit(new Callable<CqlSession>() {
+                @Override
+                public CqlSession call() {
+                    return CqlSession.builder()
+                            .addContactPoint(new InetSocketAddress(dbUrl, Integer.valueOf(dbPort)))
+                            .withLocalDatacenter(localDataCenter)
+                            .build();
+                }
+            });
+
+            cqlSession = handle.get();
+
+        } catch (Exception e) {
+            log.info("error when creating cqlSession {}", e.getMessage());
+            e.printStackTrace();
+        }
+        log.info("init Cql Session success");
+
+        return cqlSession;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
new file mode 100644
index 0000000..d6f814f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public enum DataType {
+
+    COMMON_MESSAGE,
+    TOPIC_CONFIG,
+    BROKER_CONFIG,
+    SUB_CONFIG,
+    OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
new file mode 100644
index 0000000..0911e20
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
@@ -0,0 +1,76 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+    public static String createGroupName(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createTaskId(String prefix) {
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createInstanceName(String namesrvAddr) {
+        String[] namesrvArray = namesrvAddr.split(";");
+        List<String> namesrvList = new ArrayList<>();
+        for (String ns : namesrvArray) {
+            if (!namesrvList.contains(ns)) {
+                namesrvList.add(ns);
+            }
+        }
+        Collections.sort(namesrvList);
+        return String.valueOf(namesrvList.toString().hashCode());
+    }
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<>();
+
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) {
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
new file mode 100644
index 0000000..b9b115e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class Config {
+    private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
+    /* Database Connection Config */
+    private String dbUrl;
+    private String dbPort;
+    private String localDataCenter;
+    private String dbUsername;
+    private String dbPassword;
+    private String dataType;
+    private String rocketmqTopic;
+
+    private List tableWhitelist;
+    private List tableBlacklist;
+    private String whiteDataBase;
+    private String whiteTable;
+
+
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+    public static final String CONN_WHITE_LIST = "whiteDataBase";
+    public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static final String CONN_DB_IP = "dbUrl";
+    public static final String CONN_DB_PORT = "dbPort";
+    public static final String CONN_DB_USERNAME = "dbUsername";
+    public static final String CONN_DB_PASSWORD = "dbPassword";
+    public static final String CONN_DB_DATACENTER = "localDataCenter";
+    public static final String CONN_DATA_TYPE = "dataType";
+    public static final String CONN_TOPIC_NAMES = "topicNames";
+    public static final String CONN_DB_MODE = "mode";
+
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+    public static final String REFRESH_INTERVAL = "refresh.interval";
+
+    /* Mode Config */
+    private String mode = "";
+    private String incrementingColumnName = "";
+    private String query = "";
+    private String timestampColmnName = "";
+    private boolean validateNonNull = true;
+
+    /*Connector config*/
+    private String tableTypes = "table";
+    private long pollInterval = 5000;
+    private int batchMaxRows = 100;
+    private long tablePollInterval = 60000;
+    private long timestampDelayInterval = 0;
+    private String dbTimezone = "GMT+8";
+    private String queueName;
+
+    private Logger log = LoggerFactory.getLogger(Config.class);
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("dbUrl");
+            add("dbPort");
+            add("localDataCenter");
+            add("dbUsername");
+            add("dbPassword");
+            add("mode");
+            add("rocketmqTopic");
+        }
+    };
+
+
+
+    public static Logger getLOG() {
+        return LOG;
+    }
+
+    public String getDbUrl() { return dbUrl; }
+
+    public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; }
+
+    public String getDbPort() { return dbPort; }
+
+    public void setDbPort(String dbPort) { this.dbPort = dbPort; }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getDbUsername() {
+        return dbUsername;
+    }
+
+    public void setDbUsername(String dbUsername) {
+        this.dbUsername = dbUsername;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(String dataType) {
+        this.dataType = dataType;
+    }
+
+    public String getRocketmqTopic() {
+        return rocketmqTopic;
+    }
+
+    public void setRocketmqTopic(String rocketmqTopic) {
+        this.rocketmqTopic = rocketmqTopic;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+
+    public String getIncrementingColumnName() {
+        return incrementingColumnName;
+    }
+
+    public void setIncrementingColumnName(String incrementingColumnName) {
+        this.incrementingColumnName = incrementingColumnName;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    public String getTimestampColmnName() {
+        return timestampColmnName;
+    }
+
+    public void setTimestampColmnName(String timestampColmnName) {
+        this.timestampColmnName = timestampColmnName;
+    }
+
+    public boolean isValidateNonNull() {
+        return validateNonNull;
+    }
+
+    public void setValidateNonNull(boolean validateNonNull) {
+        this.validateNonNull = validateNonNull;
+    }
+
+    public String getTableTypes() {
+        return tableTypes;
+    }
+
+    public void setTableTypes(String tableTypes) {
+        this.tableTypes = tableTypes;
+    }
+
+    public long getPollInterval() {
+        return pollInterval;
+    }
+
+    public void setPollInterval(long pollInterval) {
+        this.pollInterval = pollInterval;
+    }
+
+    public int getBatchMaxRows() {
+        return batchMaxRows;
+    }
+
+    public void setBatchMaxRows(int batchMaxRows) {
+        this.batchMaxRows = batchMaxRows;
+    }
+
+    public long getTablePollInterval() {
+        return tablePollInterval;
+    }
+
+    public void setTablePollInterval(long tablePollInterval) {
+        this.tablePollInterval = tablePollInterval;
+    }
+
+    public long getTimestampDelayInterval() {
+        return timestampDelayInterval;
+    }
+
+    public void setTimestampDelayInterval(long timestampDelayInterval) {
+        this.timestampDelayInterval = timestampDelayInterval;
+    }
+
+    public String getDbTimezone() {
+        return dbTimezone;
+    }
+
+    public void setDbTimezone(String dbTimezone) {
+        this.dbTimezone = dbTimezone;
+    }
+
+    public String getQueueName() {
+        return queueName;
+    }
+
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
+
+    public Logger getLog() {
+        return log;
+    }
+
+    public void setLog(Logger log) {
+        this.log = log;
+    }
+
+    public List getTableWhitelist() {
+        return tableWhitelist;
+    }
+
+    public void setTableWhitelist(List tableWhitelist) {
+        this.tableWhitelist = tableWhitelist;
+    }
+
+    public List getTableBlacklist() {
+        return tableBlacklist;
+    }
+
+    public void setTableBlacklist(List tableBlacklist) {
+        this.tableBlacklist = tableBlacklist;
+    }
+
+    public String getWhiteDataBase() {
+        return whiteDataBase;
+    }
+
+    public void setWhiteDataBase(String whiteDataBase) {
+        this.whiteDataBase = whiteDataBase;
+    }
+
+    public String getWhiteTable() {
+        return whiteTable;
+    }
+
+    public void setWhiteTable(String whiteTable) {
+        this.whiteTable = whiteTable;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
new file mode 100644
index 0000000..1c08fb2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+    public static <T> void load(KeyValue props, Object object) {
+
+        properties2Object(props, object);
+    }
+
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
new file mode 100644
index 0000000..3dd25c0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
@@ -0,0 +1,110 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+    public TaskDivideStrategy taskDivideStrategy;
+    public String dbUrl;
+    public String dbPort;
+    public String dbUserName;
+    public String dbPassword;
+    public String localDataCenter;
+    public String converter;
+    public int taskParallelism;
+    public String mode;
+
+    public abstract void validate(KeyValue config);
+
+    public abstract <T> T getWhiteTopics();
+
+    public TaskDivideStrategy getTaskDivideStrategy() {
+        return taskDivideStrategy;
+    }
+
+    public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+        this.taskDivideStrategy = taskDivideStrategy;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getConverter() {
+        return converter;
+    }
+
+    public void setConverter(String converter) {
+        this.converter = converter;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
new file mode 100644
index 0000000..3145033
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
@@ -0,0 +1,112 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkDbConnectorConfig extends DbConnectorConfig {
+    private Set<String> whiteList;
+    private String srcNamesrvs;
+    private String srcCluster;
+    private long refreshInterval;
+    private Map<String, Set<TaskTopicInfo>> topicRouteMap;
+
+    public SinkDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+        this.taskDivideStrategy = new DivideTaskByTopic();
+
+        buildWhiteList(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+        this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+        this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+        this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+        this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+    }
+
+    private void buildWhiteList(KeyValue config) {
+        this.whiteList = new HashSet<>();
+        String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
+        String[] wl = whiteListStr.trim().split(",");
+        if (wl.length <= 0)
+            throw new IllegalArgumentException("White list must be not empty.");
+        else {
+            this.whiteList.clear();
+            for (String t : wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+    }
+
+
+    public Set<String> getWhiteList() {
+        return whiteList;
+    }
+
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    public String getSrcNamesrvs() {
+        return this.srcNamesrvs;
+    }
+
+    public String getSrcCluster() {
+        return this.srcCluster;
+    }
+
+    public long getRefreshInterval() {
+        return this.refreshInterval;
+    }
+
+    public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
+        return topicRouteMap;
+    }
+
+    public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) {
+        this.topicRouteMap = topicRouteMap;
+    }
+
+    @Override
+    public Set<String> getWhiteTopics() {
+        return getWhiteList();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
new file mode 100644
index 0000000..6a3f685
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+    private Map<String, String> whiteMap;
+
+    public SourceDbConnectorConfig(){
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+        int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+        this.taskDivideStrategy = new DivideTaskByTopic();
+
+        buildWhiteMap(config);
+
+        this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+        this.dbUrl = config.getString(Config.CONN_DB_IP);
+        this.dbPort = config.getString(Config.CONN_DB_PORT);
+        this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+        this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+        this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+        this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+    }
+
+    private void buildWhiteMap(KeyValue config) {
+        this.whiteMap = new HashMap<>(16);
+        String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+        JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+        if(whiteDataBaseObject.keySet().size() <= 0){
+            throw new IllegalArgumentException("white data base must be not empty.");
+        }else {
+            this.whiteMap.clear();
+            for (String dbName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+                for (String tableName : whiteTableObject.keySet()){
+                    String dbTableKey = dbName + "-" + tableName;
+                    this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+                }
+            }
+        }
+    }
+
+
+    public Map<String, String> getWhiteMap() {
+        return whiteMap;
+    }
+
+    public void setWhiteMap(Map<String, String> whiteMap) {
+        this.whiteMap = whiteMap;
+    }
+
+    @Override
+    public Map<String, String> getWhiteTopics() {
+        return getWhiteMap();
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
new file mode 100644
index 0000000..7c43137
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.config;
+
+public class TaskDivideConfig {
+
+    private String dbUrl;
+
+    private String dbPort;
+
+    private String dbUserName;
+
+    private String dbPassword;
+
+    private String localDataCenter;
+
+    private String srcRecordConverter;
+
+    private int dataType;
+
+    private int taskParallelism;
+
+    private String mode;
+
+    public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String localDataCenter,
+                            String srcRecordConverter, int dataType, int taskParallelism, String mode) {
+        this.dbUrl = dbUrl;
+        this.dbPort = dbPort;
+        this.dbUserName = dbUserName;
+        this.dbPassword = dbPassword;
+        this.localDataCenter = localDataCenter;
+        this.srcRecordConverter = srcRecordConverter;
+        this.dataType = dataType;
+        this.taskParallelism = taskParallelism;
+        this.mode = mode;
+    }
+
+    public String getDbUrl() {
+        return dbUrl;
+    }
+
+    public void setDbUrl(String dbUrl) {
+        this.dbUrl = dbUrl;
+    }
+
+    public String getDbPort() {
+        return dbPort;
+    }
+
+    public void setDbPort(String dbPort) {
+        this.dbPort = dbPort;
+    }
+
+    public String getDbUserName() {
+        return dbUserName;
+    }
+
+    public void setDbUserName(String dbUserName) {
+        this.dbUserName = dbUserName;
+    }
+
+    public String getDbPassword() {
+        return dbPassword;
+    }
+
+    public void setDbPassword(String dbPassword) {
+        this.dbPassword = dbPassword;
+    }
+
+    public String getLocalDataCenter() {
+        return localDataCenter;
+    }
+
+    public void setLocalDataCenter(String localDataCenter) {
+        this.localDataCenter = localDataCenter;
+    }
+
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
+    }
+
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
+    }
+
+    public int getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(int dataType) {
+        this.dataType = dataType;
+    }
+
+    public int getTaskParallelism() {
+        return taskParallelism;
+    }
+
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
new file mode 100644
index 0000000..074faab
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
@@ -0,0 +1,40 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+    private String targetTopic;
+
+    public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+        super(sourceTopic, brokerName, queueId);
+        this.targetTopic = targetTopic;
+    }
+
+    public String getTargetTopic() {
+        return this.targetTopic;
+    }
+
+    public void setTargetTopic(String targetTopic) {
+        this.targetTopic = targetTopic;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
new file mode 100644
index 0000000..6ce23f6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
@@ -0,0 +1,240 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.cassandra.common.CloneUtils;
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.common.Utils;
+import org.apache.rocketmq.connect.cassandra.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class CassandraSinkConnector extends SinkConnector{
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkConnector.class);
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+    private ScheduledExecutorService executor;
+    private HashMap<String, Set<TaskTopicInfo>> topicRouteMap;
+
+    private DefaultMQAdminExt srcMQAdminExt;
+
+    private volatile boolean adminStarted;
+
+    private ScheduledFuture<?> listenerHandle;
+
+    public CassandraSinkConnector() {
+        topicRouteMap = new HashMap<>();
+        dbConnectorConfig = new SinkDbConnectorConfig();
+        executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("CassandraSinkConnector-SinkWatcher-%d").daemon(true).build());
+    }
+
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.CASSANDRA_CONNECTOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+        try {
+            log.info("Trying to start srcMQAdminExt");
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+        } catch (MQClientException e) {
+            log.error("Cassandra Sink Task start failed for `srcMQAdminExt` exception.", e);
+        }
+
+        adminStarted = true;
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+        startMQAdminTools();
+        startListener();
+    }
+
+    public void startListener() {
+        listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+            boolean first = true;
+            HashMap<String, Set<TaskTopicInfo>> origin = null;
+
+            @Override
+            public void run() {
+                buildRoute();
+                if (first) {
+                    origin = CloneUtils.clone(topicRouteMap);
+                    first = false;
+                }
+                if (!compare(origin, topicRouteMap)) {
+                    context.requestTaskReconfiguration();
+                    origin = CloneUtils.clone(topicRouteMap);
+                }
+            }
+        }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+    }
+
+    public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
+        if (origin.size() != updated.size()) {
+            return false;
+        }
+        for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
+            if (!updated.containsKey(entry.getKey())) {
+                return false;
+            }
+            Set<TaskTopicInfo> originTasks = entry.getValue();
+            Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+            if (originTasks.size() != updateTasks.size()) {
+                return false;
+            }
+
+            if (!originTasks.containsAll(updateTasks)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public void buildRoute() {
+        String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+        try {
+            for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+                // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                Set<String> brokerNameSet = new HashSet<String>();
+                for (BrokerData b : brokerList) {
+                    brokerNameSet.add(b.getBrokerName());
+                }
+
+                TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+                if (!topicRouteMap.containsKey(topic)) {
+                    topicRouteMap.put(topic, new HashSet<>(16));
+                }
+                for (QueueData qd : topicRouteData.getQueueDatas()) {
+                    if (brokerNameSet.contains(qd.getBrokerName())) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+                            topicRouteMap.get(topic).add(taskTopicInfo);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            // srcMQAdminExt.shutdown();
+        }
+    }
+
+
+    /**
+     * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why
+     * it can be applied to srcMQAdminExt
+     */
+    @Override
+    public void stop() {
+        listenerHandle.cancel(true);
+        // srcMQAdminExt.shutdown();
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return CassandraSinkTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        startMQAdminTools();
+
+        buildRoute();
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            this.dbConnectorConfig.getDbUrl(),
+            this.dbConnectorConfig.getDbPort(),
+            this.dbConnectorConfig.getDbUserName(),
+            this.dbConnectorConfig.getDbPassword(),
+            this.dbConnectorConfig.getLocalDataCenter(),
+            this.dbConnectorConfig.getConverter(),
+            DataType.COMMON_MESSAGE.ordinal(),
+            this.dbConnectorConfig.getTaskParallelism(),
+            this.dbConnectorConfig.getMode()
+        );
+
+        ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
new file mode 100644
index 0000000..a8e9b0a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
@@ -0,0 +1,161 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for "columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with SQL databases
+ */
+public class CassandraSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+    private Config config;
+
+    private CqlSession cqlSession;
+    private Updater updater;
+    private BlockingQueue<Updater> tableQueue = new LinkedBlockingQueue<Updater>();
+
+    public CassandraSinkTask() {
+        this.config = new Config();
+    }
+
+    @Override
+    public void put(Collection<SinkDataEntry> sinkDataEntries) {
+        try {
+            if (tableQueue.size() > 1) {
+                updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            } else {
+                updater = tableQueue.peek();
+            }
+            log.info("Cassandra Sink Task trying to put()");
+            for (SinkDataEntry record : sinkDataEntries) {
+                Map<Field, Object[]> fieldMap = new HashMap<>();
+                Object[] payloads = record.getPayload();
+                Schema schema = record.getSchema();
+                EntryType entryType = record.getEntryType();
+                String cfName = schema.getName();
+                String keyspaceName = schema.getDataSource();
+                List<Field> fields = schema.getFields();
+                Boolean parseError = false;
+                if (!fields.isEmpty()) {
+                    for (Field field : fields) {
+                        Object fieldValue = payloads[field.getIndex()];
+                        Object[] value = JSONObject.parseArray((String)fieldValue).toArray();
+                        if (value.length == 2) {
+                            fieldMap.put(field, value);
+                        } else {
+                            log.error("parseArray error, fieldValue:{}", fieldValue);
+                            parseError = true;
+                        }
+                    }
+                }
+                if (!parseError) {
+                    log.info("Cassandra Sink Task trying to call updater.push()");
+                    Boolean isSuccess = updater.push(keyspaceName, cfName, fieldMap, entryType);
+                    if (!isSuccess) {
+                        log.error("push data error, keyspaceName:{}, cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap, entryType);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("put sinkDataEntries error, {}", e);
+        }
+    }
+
+    @Override
+    public void commit(Map<QueueMetaData, Long> map) {
+
+    }
+
+    /**
+     * Remember always close the CqlSession according to
+     * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+     * @param props
+     */
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.config);
+            cqlSession = DBUtils.initCqlSession(config);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Cassandra Sink Task because of configuration error{}", e);
+        }
+        String mode = config.getMode();
+        if (mode.equals("bulk")) {
+            Updater updater = new Updater(config, cqlSession);
+            try {
+                updater.start();
+                tableQueue.add(updater);
+            } catch (Exception e) {
+                log.error("fail to start updater{}", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (cqlSession != null){
+                cqlSession.close();
+                log.info("cassandra sink task connection is closed.");
+            }
+        } catch (Throwable e) {
+            log.warn("sink task stop error while closing connection to {}", "cassandra", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
new file mode 100644
index 0000000..a8adc74
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.SourceDbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceConnector extends SourceConnector {
+    private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector.class);
+    private DbConnectorConfig dbConnectorConfig;
+    private volatile boolean configValid = false;
+
+    public CassandraSourceConnector() {
+        dbConnectorConfig = new SourceDbConnectorConfig();
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        log.info("CassandraSourceConnector verifyAndSetConfig enter");
+        for (String requestKey : Config.REQUEST_CONFIG) {
+
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        try {
+            this.dbConnectorConfig.validate(config);
+        } catch (IllegalArgumentException e) {
+            return e.getMessage();
+        }
+        this.configValid = true;
+
+        return "";
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return CassandraSourceTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        log.info("List.start");
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+                this.dbConnectorConfig.getDbUrl(),
+                this.dbConnectorConfig.getDbPort(),
+                this.dbConnectorConfig.getDbUserName(),
+                this.dbConnectorConfig.getDbPassword(),
+                this.dbConnectorConfig.getLocalDataCenter(),
+                this.dbConnectorConfig.getConverter(),
+                DataType.COMMON_MESSAGE.ordinal(),
+                this.dbConnectorConfig.getTaskParallelism(),
+                this.dbConnectorConfig.getMode()
+        );
+        return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
new file mode 100644
index 0000000..cac44ed
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
@@ -0,0 +1,168 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.apache.rocketmq.connect.cassandra.source.Querier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+
+    private Config config;
+
+    private DataSource dataSource;
+
+    private CqlSession cqlSession;
+
+    BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
+    static final String INCREMENTING_FIELD = "incrementing";
+    static final String TIMESTAMP_FIELD = "timestamp";
+    private Querier querier;
+
+    public CassandraSourceTask() {
+        this.config = new Config();
+    }
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        try {
+            if (tableQueue.size() > 1)
+                querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+            else
+                querier = tableQueue.peek();
+            Timer timer = new Timer();
+            try {
+                Thread.currentThread();
+                Thread.sleep(1000);//毫秒
+            } catch (Exception e) {
+                throw e;
+            }
+            querier.poll();
+            for (Table dataRow : querier.getList()) {
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put("nextQuery", "database");
+                jsonObject.put("nextPosition", "table");
+                Schema schema = new Schema();
+                schema.setDataSource(dataRow.getDatabase());
+                schema.setName(dataRow.getName());
+                schema.setFields(new ArrayList<>());
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
+                    String columnName = dataRow.getColList().get(i);
+                    String rawDataType = dataRow.getRawDataTypeList().get(i);
+                    Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+                    schema.getFields().add(field);
+                }
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+                        .entryType(EntryType.UPDATE);
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
+                    Object[] value = new Object[2];
+                    value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
+                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
+                }
+
+                SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                        ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+                        ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
+                res.add(sourceDataEntry);
+                log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
+            }
+        } catch (Exception e) {
+            log.error("Cassandra task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res));
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            ConfigUtil.load(props, this.config);
+            cqlSession = DBUtils.initCqlSession(config);
+            log.info("init data source success");
+        } catch (Exception e) {
+            log.error("Cannot start Cassandra Source Task because of configuration error{}", e);
+        }
+        Map<Map<String, String>, Map<String, Object>> offsets = null;
+        String mode = config.getMode();
+        if (mode.equals("bulk")) {
+            Querier querier = new Querier(config, cqlSession);
+            try {
+                querier.start();
+                tableQueue.add(querier);
+            } catch (Exception e) {
+                log.error("start querier failed in bulk mode{}", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (cqlSession != null) {
+                cqlSession.close();
+                log.info("Cassandra source task connection is closed.");
+            }
+        } catch (Throwable e) {
+            log.warn("source task stop error while closing connection to {}", "Cassandra", e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
new file mode 100644
index 0000000..c8f69e6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class Database {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Database.class);
+
+    private static final String CQL = "SELECT * FROM system_schema.columns \n" +
+            "WHERE keyspace_name = 'keyspace_name' AND table_name = 'table_name'";
+
+    private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+    private static final String TABLE_COLUMNS = "columns";
+
+    private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+    private static final String COLUMN_TABLE_NAME = "table_name";
+
+    private static final String COLUMN_COLUMN_NAME = "column_name";
+
+    private static final String COLUMN_TYPE = "type";
+
+    private static final String GENERAL_CHARSET = "utf-8";
+
+    private String name;
+
+    private CqlSession cqlSession;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Set<String> tableWhiteList;
+
+    public Map<String, Map<String, String>> tableFilterMap;
+
+    public Database(String name, CqlSession cqlSession, Set<String> tableWhiteList, Map<String, Map<String, String>> tableFilterMap) {
+        this.name = name;
+        this.cqlSession = cqlSession;
+        this.tableWhiteList = tableWhiteList;
+        this.tableFilterMap = tableFilterMap;
+
+    }
+
+    public void init(){
+        Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_COLUMNS)
+                .all()
+                .whereColumn(COLUMN_KEYSPACE_NAME)
+                .isEqualTo(QueryBuilder.literal(name));
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+        ResultSet result = null;
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = selectFrom.build();
+                result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    LOGGER.info("query columns success, executed cql query {}", selectFrom.asCql());
+                }
+                finishUpdate = true;
+            }
+
+            for (Row row : result) {
+                String tableName = row.getString(COLUMN_TABLE_NAME);
+                String columnName = row.getString(COLUMN_COLUMN_NAME);
+                String columnType = row.getString(COLUMN_TYPE);
+
+
+
+                ColumnParser columnParser = ColumnParser.getColumnParser(columnType, columnType, GENERAL_CHARSET);
+
+                if (!tableWhiteList.contains(tableName)){
+                    continue;
+                }
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(columnName);
+                table.addParser(columnParser);
+                table.addRawDataType(columnType);
+                table.setFilterMap(tableFilterMap.get(tableName));
+            }
+        } catch (Exception e) {
+            LOGGER.error("init cassandra Schema failure,{}", e);
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+
+    public Map<String, Table> getTableMap() {
+        return tableMap;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
new file mode 100644
index 0000000..01054e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+
+public class Schema {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
+
+    private static final String CQL = "SELECT * FROM system_schema.keyspaces";
+
+    private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+    private static final String TABLE_KEYSPACES = "keyspaces";
+
+    private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+    private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+        Arrays.asList(new String[] {"system_schema", "system", "system_auth", "system_distributed", "system_traces"})
+    );
+
+    private CqlSession cqlSession;
+
+    private Map<String, Database> dbMap;
+
+    public Map<String, Set<String>> dbTableMap;
+
+    public Map<String, Map<String, String>> tableFilterMap;
+
+    public Schema(CqlSession cqlSession) {
+        this.cqlSession = cqlSession;
+        this.dbTableMap = new HashMap<>();
+        this.tableFilterMap = new HashMap<>();
+    }
+
+    public void load() throws Exception {
+
+        dbMap = new HashMap<>();
+
+        Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_KEYSPACES)
+                .column(COLUMN_KEYSPACE_NAME);
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+        ResultSet result = null;
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = selectFrom.build();
+                result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    LOGGER.info("update table success, executed cql query {}", selectFrom.asCql());
+                }
+                finishUpdate = true;
+            }
+
+            for (Row row : result) {
+                String dbName = row.getString(COLUMN_KEYSPACE_NAME);
+                if (!IGNORED_DATABASES.contains(dbName) && dbTableMap.keySet().contains(dbName)) {
+                    Database database = new Database(dbName, cqlSession, dbTableMap.get(dbName), tableFilterMap);
+                    dbMap.put(dbName, database);
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("init cassandra Schema failure,{}", e);
+        }
+
+        for (Database db : dbMap.values()) {
+            db.init();
+        }
+
+    }
+
+//    public Table getTable(String dbName, String tableName) {
+//
+//        if (dbMap == null) {
+//            reload();
+//        }
+//
+//        Database database = dbMap.get(dbName);
+//        if (database == null) {
+//            return null;
+//        }
+//
+//        Table table = database.getTable(tableName);
+//        if (table == null) {
+//            return null;
+//        }
+//
+//        return table;
+//    }
+
+    private void reload() {
+
+        while (true) {
+            try {
+                load();
+                break;
+            } catch (Exception e) {
+                LOGGER.error("Cassandra Source Connector reload schema error.", e);
+            }
+        }
+    }
+
+    public void reset() {
+        dbMap = null;
+    }
+
+    public Map<String, Database> getDbMap() {
+        return dbMap;
+    }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
new file mode 100644
index 0000000..902b797
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class Table {
+
+    private String database;
+    private String name;
+    private List<String> colList = new LinkedList<>();
+    private List<ColumnParser> parserList = new LinkedList<>();
+    private List<String> rawDataTypeList = new LinkedList<>();
+    private List<Object> dataList = new LinkedList<>();
+    private Map<String, String> filterMap = new HashMap<>();
+
+    public Table(String database, String table) {
+        this.database = database;
+        this.name = table;
+    }
+
+    public void addCol(String column) {
+        colList.add(column);
+    }
+
+    public void setParserList(List<ColumnParser> parserList) {
+        this.parserList = parserList;
+    }
+
+    public void setRawDataTypeList(List<String> rawDataTypeList) {
+        this.rawDataTypeList = rawDataTypeList;
+    }
+
+    public void addParser(ColumnParser columnParser) {
+        parserList.add(columnParser);
+    }
+
+    public void addRawDataType(String rawDataType) {
+        this.rawDataTypeList.add(rawDataType);
+    }
+
+    public List<String> getColList() {
+        return colList;
+    }
+
+    public List<String> getRawDataTypeList() {
+        return rawDataTypeList;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<ColumnParser> getParserList() {
+        return parserList;
+    }
+
+    public List<Object> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Object> dataList) {
+        this.dataList = dataList;
+    }
+
+    public void setColList(List<String> colList) {
+        this.colList = colList;
+    }
+
+    public Map<String, String> getFilterMap() {
+        return filterMap;
+    }
+
+    public void setFilterMap(Map<String, String> filterMap) {
+        this.filterMap = filterMap;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
new file mode 100644
index 0000000..0506469
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.math.BigInteger;
+
+public class BigIntColumnParser extends ColumnParser {
+
+    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+    private boolean signed;
+
+    public BigIntColumnParser(String colType) {
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return value;
+        }
+
+        Long l = (Long) value;
+        if (!signed && l < 0) {
+            return max.add(BigInteger.valueOf(l));
+        } else {
+            return l;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
new file mode 100644
index 0000000..8d5b2bb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class BooleanColumnParser extends ColumnParser{
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Boolean) {
+            return value;
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
new file mode 100644
index 0000000..1eab587
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import io.openmessaging.connector.api.data.FieldType;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+
+
+    /**
+     * Currently this class implementation is not complete yet.
+     * @param dataType
+     * @param colType
+     * @param charset
+     * @return
+     */
+    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "varint":
+            case "int":
+                return new IntColumnParser(dataType, colType);
+            case "bigint":
+                return new BigIntColumnParser(colType);
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "ascii":
+            case "char":
+                return new StringColumnParser(charset);
+            case "date":
+            case "datetime":
+            case "timestamp":
+                return new DateTimeColumnParser();
+            case "time":
+                return new TimeColumnParser();
+            case "year":
+                return new YearColumnParser();
+            case "enum":
+                return new EnumColumnParser(colType);
+            case "set":
+                return new SetColumnParser(colType);
+            case "boolean":
+                return new BooleanColumnParser();
+            default:
+                return new DefaultColumnParser();
+        }
+    }
+
+    public static FieldType mapConnectorFieldType(String dataType) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return FieldType.INT32;
+            case "bigint":
+                return FieldType.BIG_INTEGER;
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return FieldType.STRING;
+            case "date":
+            case "datetime":
+            case "timestamp":
+            case "time":
+            case "year":
+                return FieldType.DATETIME;
+            case "enum":
+                return null;
+            case "set":
+                return null;
+            default:
+                return FieldType.BYTES;
+        }
+    }
+
+    public static String[] extractEnumValues(String colType) {
+        String[] enumValues = {};
+        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+        if (matcher.matches()) {
+            enumValues = matcher.group(2).replace("'", "").split(",");
+        }
+
+        return enumValues;
+    }
+
+    public abstract Object getValue(Object value);
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..b110a19
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+    private static SimpleDateFormat dateTimeFormat;
+    private static SimpleDateFormat dateTimeUtcFormat;
+
+    static {
+        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("GMT+8"));
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return dateTimeFormat.format(value);
+        }
+
+        if (value instanceof Long) {
+            return dateTimeUtcFormat.format(new Date((Long) value));
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
new file mode 100644
index 0000000..01d8d1a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class DefaultColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof byte[]) {
+            return Base64.encodeBase64String((byte[]) value);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
new file mode 100644
index 0000000..245dfd6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class EnumColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public EnumColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        Integer i = (Integer) value;
+        if (i == 0) {
+            return null;
+        } else {
+            return enumValues[i - 1];
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..2257682
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+    private int bits;
+    private boolean signed;
+
+    public IntColumnParser(String dataType, String colType) {
+
+        switch (dataType) {
+            case "tinyint":
+                bits = 8;
+                break;
+            case "smallint":
+                bits = 16;
+                break;
+            case "mediumint":
+                bits = 24;
+                break;
+            case "int":
+                bits = 32;
+        }
+
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Long) {
+            return value;
+        }
+
+        if (value instanceof Integer) {
+            Integer i = (Integer) value;
+            if (signed || i > 0) {
+                return i;
+            } else {
+                return (1L << bits) + i;
+            }
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..eaa6dad
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public SetColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        StringBuilder builder = new StringBuilder();
+        long l = (Long) value;
+
+        boolean needSplit = false;
+        for (int i = 0; i < enumValues.length; i++) {
+            if (((l >> i) & 1) == 1) {
+                if (needSplit)
+                    builder.append(",");
+
+                builder.append(enumValues[i]);
+                needSplit = true;
+            }
+        }
+
+        return builder.toString();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..2bd7a36
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+    private String charset;
+
+    public StringColumnParser(String charset) {
+        this.charset = charset.toLowerCase();
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        byte[] bytes = (byte[]) value;
+
+        switch (charset) {
+            case "utf8":
+            case "utf8mb4":
+                return new String(bytes, Charsets.UTF_8);
+            case "latin1":
+            case "ascii":
+                return new String(bytes, Charsets.ISO_8859_1);
+            case "ucs2":
+                return new String(bytes, Charsets.UTF_16);
+            default:
+                return new String(bytes, Charsets.toCharset(charset));
+
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
new file mode 100644
index 0000000..c812a53
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class TimeColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+
+            return new Time(((Timestamp) value).getTime());
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
new file mode 100644
index 0000000..82d61a8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Date;
+import java.util.Calendar;
+
+public class YearColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime((Date) value);
+            return calendar.get(Calendar.YEAR);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
new file mode 100644
index 0000000..1c3f2c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.sink;
+
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.delete.Delete;
+import com.datastax.oss.driver.api.querybuilder.delete.DeleteSelection;
+import com.datastax.oss.driver.api.querybuilder.insert.InsertInto;
+import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Updater {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Queue<CqlSession> connections = new ConcurrentLinkedQueue<>();
+    private Config config;
+    private CqlSession cqlSession;
+
+    private static final int BEFORE_UPDATE = 0;
+    private static final int AFTER_UPDATE = 1;
+
+    public Updater(Config config, CqlSession cqlSession) {
+        this.config = config;
+        this.cqlSession = cqlSession;
+    }
+
+    /**
+     * We cannot know the primary key of each table, so we have to put every field in the where clause
+     * @param dbName
+     * @param tableName
+     * @param fieldMap
+     * @param entryType
+     * @return
+     */
+    public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) {
+        log.info("Updater Trying to push data");
+        Boolean isSuccess = false;
+        boolean afterUpdateExist;
+        boolean beforeUpdateExist;
+        switch (entryType) {
+            case CREATE:
+                isSuccess = updateRow(dbName, tableName, fieldMap);
+                break;
+            case UPDATE:
+                isSuccess = updateRow(dbName, tableName, fieldMap);
+                break;
+            case DELETE:
+                isSuccess = deleteRow(dbName, tableName, fieldMap);
+                break;
+            default:
+                log.error("entryType {} is illegal.", entryType.toString());
+        }
+        return isSuccess;
+    }
+
+    public void start() throws Exception {
+        log.info("schema load success");
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+
+    /** Since we have no way of getting the id of a record, and we cannot get the primary key list of a table,
+     * even we can it is not extensible. So we the result sql sentense would be like
+     * UPDATE dbName.tableName SET afterUpdateValues WHERE beforeUpdateValues.
+     *
+     */
+    private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+        log.info("Updater.updateRow() get called ");
+        int count = 0;
+        InsertInto insert = QueryBuilder.insertInto(dbName, tableName);
+        RegularInsert regularInsert = null;
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if (count == 1) {
+                regularInsert = insert.value(fieldName, buildTerm(fieldType, fieldValue));
+            }
+            else {
+                regularInsert = regularInsert.value(fieldName, buildTerm(fieldType, fieldValue));
+            }
+        }
+
+
+        SimpleStatement stmt;
+        boolean finishUpdate = false;
+        log.info("trying to execute sql query,{}", regularInsert.asCql());
+        try {
+            while (!cqlSession.isClosed() && !finishUpdate){
+                stmt = regularInsert.build();
+                ResultSet result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    log.info("update table success, executed cql query {}", regularInsert.asCql());
+                    return true;
+                }
+                finishUpdate = true;
+            }
+        } catch (Exception e) {
+            log.error("update table error,{}", e);
+        }
+        return false;
+    }
+
+
+    private boolean deleteRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+        DeleteSelection deleteSelection = QueryBuilder.deleteFrom(dbName, tableName);
+        Delete delete = null;
+        int count = 0;
+        for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+            count++;
+            String fieldName = entry.getKey().getName();
+            FieldType fieldType = entry.getKey().getType();
+            Object fieldValue = entry.getValue()[1];
+            if (count == 1) {
+                delete = deleteSelection.whereColumn(fieldName)
+                    .isEqualTo(buildTerm(fieldType, fieldValue));
+            }
+            else {
+                delete = delete.whereColumn(fieldName)
+                    .isEqualTo(buildTerm(fieldType, fieldValue));
+            }
+        }
+
+        boolean finishDelete = false;
+        SimpleStatement stmt = delete.build();
+        try {
+            while (!cqlSession.isClosed() && !finishDelete){
+                ResultSet result = cqlSession.execute(stmt);
+                if (result.wasApplied()) {
+                    log.info("delete from table success, executed query {}", delete);
+                    return true;
+                }
+                finishDelete = true;
+            }
+        } catch (Exception e) {
+            log.error("delete from table error,{}", e);
+        }
+        return false;
+    }
+
+
+    /**
+     * Cassandra datastax driver automatically
+     * infer type from literal value, or we can use "typeHints" to
+     * tell datastax driver what type should this  literal be. We will add
+     * type hints utils once we found it is necessary to do so. For now literal
+     * inference should be enough.
+     *
+     * @param fieldType
+     * @param fieldValue
+     * @return
+     */
+    private Term buildTerm(FieldType fieldType, Object fieldValue) {
+        return QueryBuilder.literal(fieldValue);
+    }
+    private DataType typeParser(FieldType fieldType) {
+        DataType dataType = null;
+        switch (fieldType) {
+            case STRING:
+                break;
+            case DATETIME:
+                break;
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BIG_INTEGER:
+                dataType = DataTypes.BIGINT;
+                break;
+            default:
+                log.error("fieldType {} is illegal.", fieldType.toString());
+        }
+        return dataType;
+    }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
new file mode 100644
index 0000000..8a8ddf4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.source;
+
+import com.alibaba.fastjson.JSONObject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.schema.Database;
+import org.apache.rocketmq.connect.cassandra.schema.Schema;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Querier {
+
+    private final Logger log = LoggerFactory.getLogger(Querier.class); // use concrete subclass
+    protected String topicPrefix;
+    private final Queue<CqlSession> cqlSessions = new ConcurrentLinkedQueue<>();
+    private Config config;
+    private CqlSession cqlSession;
+    private List<Table> list = new LinkedList<>();
+    private String mode;
+    private Schema schema;
+
+    public Querier(){
+
+    }
+
+    public Querier(Config config, CqlSession cqlSession) {
+        this.config = config;
+        this.cqlSession = cqlSession;
+        this.schema = new Schema(cqlSession);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public void setConfig(Config config) {
+        this.config = config;
+    }
+
+    public List<Table> getList() {
+        return list;
+    }
+
+
+    public void poll()  {
+        try {
+            LinkedList<Table> tableLinkedList = new LinkedList<>();
+
+            for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
+                String dbName = entry.getKey();
+                Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, Table> tableEntry = iterator.next();
+                    String tableName = tableEntry.getKey();
+                    Table table = tableEntry.getValue();
+                    Map<String, String> tableFilterMap = table.getFilterMap();
+
+
+                    Select selectFrom = QueryBuilder.selectFrom(dbName, tableName).all();
+                    if (tableFilterMap != null && !tableFilterMap.keySet().contains("NO-FILTER")){
+                        for (String key: tableFilterMap.keySet()) {
+                            String value = tableFilterMap.get(key);
+                            selectFrom.whereColumn(key).isEqualTo(QueryBuilder.literal(value));
+                        }
+                    }
+
+
+                    SimpleStatement stmt;
+                    boolean finishUpdate = false;
+                    log.info("trying to execute sql query,{}", selectFrom.asCql());
+                    ResultSet result = null;
+                    while (!cqlSession.isClosed() && !finishUpdate){
+                        stmt = selectFrom.build();
+                        result = cqlSession.execute(stmt);
+                        if (result.wasApplied()) {
+                            log.info("query columns success, executed cql query {}", selectFrom.asCql());
+                        }
+                        finishUpdate = true;
+                    }
+
+                    List<String> colList = tableEntry.getValue().getColList();
+                    List<String> dataTypeList = tableEntry.getValue().getRawDataTypeList();
+                    List<ColumnParser> parserList = tableEntry.getValue().getParserList();
+
+                    for (Row row : result) {
+                        Table tableWithData = new Table(dbName, tableName);
+                        tableWithData.setColList(colList);
+                        tableWithData.setRawDataTypeList(dataTypeList);
+                        tableWithData.setParserList(parserList);
+
+                        for (String col : colList) {
+                            tableWithData.getDataList().add(row.getObject(col));
+                        }
+                        tableLinkedList.add(tableWithData);
+                    }
+                }
+            }
+            list = tableLinkedList;
+        } catch (Exception e) {
+            log.error("fail to poll data, {}", e);
+        }
+
+    }
+
+    public void start() throws Exception {
+        String whiteDataBases = config.getWhiteDataBase();
+        JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteDataBases);
+
+        if (whiteDataBaseObject != null){
+            for (String whiteDataBaseName : whiteDataBaseObject.keySet()){
+                JSONObject whiteTableObject = (JSONObject)whiteDataBaseObject.get(whiteDataBaseName);
+                HashSet<String> whiteTableSet = new HashSet<>();
+                for (String whiteTableName : whiteTableObject.keySet()){
+                    Collections.addAll(whiteTableSet, whiteTableName);
+                    HashMap<String, String> filterMap = new HashMap<>();
+                    JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString());
+                    for(String filterKey : tableFilterObject.keySet()){
+                        filterMap.put(filterKey, tableFilterObject.getString(filterKey));
+                    }
+                    schema.tableFilterMap.put(whiteTableName, filterMap);
+                }
+                schema.dbTableMap.put(whiteDataBaseName, whiteTableSet);
+            }
+        }
+        schema.load();
+        log.info("load schema success");
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..f0eac2b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+public enum DivideStrategyEnum {
+
+    BY_TOPIC,
+    BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..68395d3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.cassandra.config.*;
+
+
+import java.util.*;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+    @Override
+    public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        if (dbConnectorConfig instanceof SourceDbConnectorConfig){
+            return divideSourceTaskByTopic(dbConnectorConfig, tdc);
+        }else {
+            return divideSinkTaskByTopic(dbConnectorConfig, tdc);
+        }
+    }
+
+    private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
+        for (String topicName : topicRouteSet) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new StringBuilder(topicName));
+            } else {
+                taskTopicList.get(ind).append(",").append(topicName);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+            keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+
+    private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+        List<KeyValue> config = new ArrayList<KeyValue>();
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+        Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+        for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
+            int ind = ++id % parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new HashMap<>());
+            }
+            String dbKey = entry.getKey().split("-")[0];
+            String tableKey = entry.getKey().split("-")[1];
+            String filter = entry.getValue();
+            Map<String, String> tableMap = new HashMap<>();
+            tableMap.put(tableKey, filter);
+            if(!taskTopicList.get(ind).containsKey(dbKey)){
+                taskTopicList.get(ind).put(dbKey, tableMap);
+            }else {
+                taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+            }
+        }
+
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+
+            keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+            keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+            keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+            keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+            keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+            keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+            keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+            keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+            keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+            config.add(keyValue);
+        }
+
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..70a6773
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
@@ -0,0 +1,32 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskTopicInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+    public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc);
+}