You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:19:17 UTC
[rocketmq-connect] 01/05: [ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit e2cc843ef4926a98797ca76880579941d5363fc6
Author: affe <af...@gmail.com>
AuthorDate: Tue Jul 28 11:41:32 2020 +0800
[ISSUE #570] ASoC runtime optimization: Cassandra connectors (#587)
* feat(connect-cassanra) merge affe's cassandra connector implementation
* Merge master into current branch : deleted .iml in connect-jms
* style(connect-cassandra): resolve all TODOs and meaningless comments
---
README.md | 97 +++++++
pom.xml | 276 ++++++++++++++++++++
scripts/gen_data.py | 155 +++++++++++
scripts/requirements.txt | 2 +
.../connect/cassandra/common/CloneUtils.java | 44 ++++
.../connect/cassandra/common/ConstDefine.java | 23 ++
.../rocketmq/connect/cassandra/common/DBUtils.java | 91 +++++++
.../connect/cassandra/common/DataType.java | 26 ++
.../rocketmq/connect/cassandra/common/Utils.java | 76 ++++++
.../rocketmq/connect/cassandra/config/Config.java | 282 +++++++++++++++++++++
.../connect/cassandra/config/ConfigUtil.java | 70 +++++
.../cassandra/config/DbConnectorConfig.java | 110 ++++++++
.../cassandra/config/SinkDbConnectorConfig.java | 112 ++++++++
.../cassandra/config/SourceDbConnectorConfig.java | 87 +++++++
.../connect/cassandra/config/TaskDivideConfig.java | 123 +++++++++
.../connect/cassandra/config/TaskTopicInfo.java | 40 +++
.../connector/CassandraSinkConnector.java | 240 ++++++++++++++++++
.../cassandra/connector/CassandraSinkTask.java | 161 ++++++++++++
.../connector/CassandraSourceConnector.java | 108 ++++++++
.../cassandra/connector/CassandraSourceTask.java | 168 ++++++++++++
.../connect/cassandra/schema/Database.java | 140 ++++++++++
.../rocketmq/connect/cassandra/schema/Schema.java | 146 +++++++++++
.../rocketmq/connect/cassandra/schema/Table.java | 103 ++++++++
.../schema/column/BigIntColumnParser.java | 50 ++++
.../schema/column/BooleanColumnParser.java | 34 +++
.../cassandra/schema/column/ColumnParser.java | 118 +++++++++
.../schema/column/DateTimeColumnParser.java | 53 ++++
.../schema/column/DefaultColumnParser.java | 37 +++
.../cassandra/schema/column/EnumColumnParser.java | 46 ++++
.../cassandra/schema/column/IntColumnParser.java | 66 +++++
.../cassandra/schema/column/SetColumnParser.java | 54 ++++
.../schema/column/StringColumnParser.java | 57 +++++
.../cassandra/schema/column/TimeColumnParser.java | 39 +++
.../cassandra/schema/column/YearColumnParser.java | 40 +++
.../rocketmq/connect/cassandra/sink/Updater.java | 216 ++++++++++++++++
.../rocketmq/connect/cassandra/source/Querier.java | 164 ++++++++++++
.../cassandra/strategy/DivideStrategyEnum.java | 23 ++
.../cassandra/strategy/DivideTaskByTopic.java | 110 ++++++++
.../cassandra/strategy/TaskDivideStrategy.java | 32 +++
39 files changed, 3819 insertions(+)
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..6439269
--- /dev/null
+++ b/README.md
@@ -0,0 +1,97 @@
+# rocketmq-connect-cassandra
+
+## rocketmq-connect-cassandra 打包
+```
+mvn clean install -DskipTest -U
+```
+
+## 目前安装会遇到的问题
+
+目前的rocketmq-connect-cassandra 使用的是datastax-java-driver:4.5.0版本的cassandra-driver,由于在打包过程中还有没有解决的问题,该cassandra driver无法读取
+位于driver包中的默认配置文件,因此我们需要手动下载cassandra driver的配置文件[reference.conf](https://github.com/datastax/java-driver/blob/4.5.0/core/src/main/resources/reference.conf) 并将其放置于classpath中。
+
+该问题还仍然在解决的过程中。
+
+## rocketmq-connect-cassandra 启动
+
+* **cassandra-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-source-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSourceConnector",“dbUrl”:"${source-db-ip}",dbPort”:"${source-db-port}",dbUsername”:"${source-db-username}",dbPassword”:"${source-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","whiteDataBase":{"${source-db-name}":{"${source-table-name}":{"${source-column-name}":"${source-column-value}"}}},"source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+
+* **cassandra-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-sink-connector-name}
+?config={"connector-class":"org.apache.rocketmq.connect.cassandra.connector.JdbcSinkConnector",“dbUrl”:"${sink-db-ip}",dbPort”:"${sink-db-port}",dbUsername”:"${sink-db-username}",dbPassword”:"${sink-db-password}","rocketmqTopic":"cassandraTopic","mode":"bulk","topicNames":"${sink-topic-name}","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+>**注:** `rocketmq-cassandra-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-cassandra 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-cassandra-connector-name}/stop
+```
+
+## rocketmq-connect-cassandra 参数说明
+* **cassandra-source-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | source端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | source端 DB port | 3306 |
+|dbUsername | String | 是 | source端 DB 用户名 | root |
+|dbPassword | String | 是 | source端 DB 密码 | 123456 |
+|rocketmqTopic | String | 是 | 待废弃的参数,需和topicNames相同 | jdbc_cassandra |
+|topicNames | String | 是 | rocketmq默认每一个数据源中的表对应一个名字,该名称需和数据库表名称相同 | jdbc_cassandra |
+|whiteDataBase | String | 是 | source端同步数据白名单,嵌套配置,为{DB名:{表名:{字段名:字段值}}},若无指定字段数据同步,字段名可设为NO-FILTER,值为任意 | {"DATABASE_TEST":{"TEST_DATA":{"name":"test"}}} |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|localDataCenter | String | 是 | 待废弃 | cassandra 集群的datacenter名称,为必填项 |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
+
+
+示例配置如下
+```js
+{
+ "connector-class":"org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector",
+ "rocketmqTopic":"jdbc_cassandra",
+ "topicNames": "jdbc_cassandra",
+ "dbUrl":"127.0.0.1",
+ "dbPort":"9042",
+ "dbUsername":"cassandra",
+ "dbPassword":"cassandra",
+ "localDataCenter":"datacenter1",
+ "whiteDataBase": {
+ "jdbc":{
+ "jdbc_cassandra": {"NO-FILTER": "10"}
+ }
+ },
+ "mode": "bulk",
+ "task-parallelism": 1,
+ "source-cluster": "172.17.0.1:10911",
+ "source-rocketmq": "127.0.0.1:9876",
+ "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+ }
+```
+* **cassandra-sink-connector 参数说明**
+
+参数 | 类型 | 是否必须 | 描述 | 样例
+|---|---|---|---|---|
+|dbUrl | String | 是 | sink端 DB ip | 192.168.1.2|
+|dbPort | String | 是 | sink端 DB port | 3306 |
+|dbUsername | String | 是 | sink端 DB 用户名 | root |
+|dbPassword | String | 是 | sink端 DB 密码 | 123456 |
+|topicNames | String | 是 | sink端同步数据的topic名字 | topic-1,topic-2 |
+|mode | String | 是 | source-connector 模式,目前仅支持bulk | bulk |
+|~~rocketmqTopic~~ | String | 是 | 待废弃 | cassandraTopic |
+|task-divide-strategy | Integer | 否 | task 分配策略, 默认值为 0,表示按照topic分配任务,每一个table便是一个topic | 0 |
+|task-parallelism | Integer | 否 | task parallelism,默认值为 1,表示将topic拆分为多少个任务进行执行 | 2 |
+|source-rocketmq | String | 是 | sink 端获取路由信息连接到的RocketMQ nameserver 地址 | 172.17.0.1:10911 |
+|source-cluster | String | 是 | sink 端获取路由信息连接到的RocketMQ broker cluster 地址 | 127.0.0.1:9876 |
+|source-record-converter | String | 是 | source data 解析 | org.apache.rocketmq.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..286a6ef
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,276 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-cassandra</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>connect-cassandra</name>
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <rocketmq.version>4.5.2</rocketmq.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <version>0.12</version>
+ <configuration>
+ <excludes>
+ <exclude>README.md</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <!-- The Main Class Here doesn't make a lot sense since it was dynamically loaded-->
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.cassandra.connector.CassandraSinkConnector</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.12</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.1</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>0.3.1-alpha</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.60</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>io.javalin</groupId>
+ <artifactId>javalin</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>1.0.31</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.oss</groupId>
+ <artifactId>java-driver-core-shaded</artifactId>
+ <version>4.5.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.datastax.oss</groupId>
+ <artifactId>java-driver-query-builder</artifactId>
+ <version>4.5.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/scripts/gen_data.py b/scripts/gen_data.py
new file mode 100644
index 0000000..8fde504
--- /dev/null
+++ b/scripts/gen_data.py
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from sqlalchemy import *
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, String
+from sqlalchemy.orm import sessionmaker
+import argparse
+import random
+import string
+import datetime
+import time
+import uuid
+
+
+
+Base = declarative_base()
+class JdbcCassandra(Base):
+ __tablename__ = 'jdbc_cassandra'
+ int_type = Column(INT, primary_key=True, autoincrement=True)
+ ascii_type = Column(VARCHAR(50))
+ boolean_type = Column(BOOLEAN)
+ date_type = Column(DATE)
+ decimal_type = Column(DECIMAL)
+ double_type = Column(FLOAT)
+ float_type = Column(FLOAT)
+ inet_type = Column(VARCHAR(50))
+ smallint_type = Column(SMALLINT)
+ time_type = Column(TIME)
+ text_type = Column(TEXT)
+ timestamp_type = Column(TIMESTAMP)
+ #timeuuid_type = Column(CHAR(36))
+ tinyint_type = Column(SMALLINT)
+ #uuid_type = Column(CHAR(36))
+ varchar_type = Column(VARCHAR(50))
+ varint_type = Column(BIGINT)
+
+
+
+def random_string(strlen=8):
+ letters = string.ascii_lowercase
+ return bytes(''.join(random.choice(letters) for i in range(strlen)), 'utf8')
+
+
+def str_time_prop(start, end, format, prop):
+ """Get a time at a proportion of a range of two formatted times.
+
+ start and end should be strings specifying times formated in the
+ given format (strftime-style), giving an interval [start, end].
+ prop specifies how a proportion of the interval to be taken after
+ start. The returned time will be in the specified format.
+ """
+
+ stime = time.mktime(time.strptime(start, format))
+ etime = time.mktime(time.strptime(end, format))
+
+ ptime = stime + prop * (etime - stime)
+
+ return time.strftime(format, time.localtime(ptime))
+
+
+def random_date(start, end, prop):
+ return bytes(str_time_prop(start, end, '%m/%d/%Y %I:%M %p', prop), 'utf8')
+
+def random_time(start, end, prop):
+ return bytes(str_time_prop(start, end, '%Y-%m-%d %H:%M:%S', prop), 'utf8')
+
+def format_time():
+ t = datetime.datetime.now()
+ s = t.strftime('%Y-%m-%d %H:%M:%S.%f')
+ return s[:-3]
+
+def main():
+
+ # define parser
+ parser = argparse.ArgumentParser()
+ parser.add_argument('hostname', metavar='HOSTNAME', type=str,
+ help='hostname of target mysql database ')
+ parser.add_argument('port', metavar='PORT', type=str,
+ help='port of target mysql database')
+ parser.add_argument('username', metavar='USERNAME', type=str,
+ help='username of the user connecting to mysql')
+ parser.add_argument('password', metavar='PASSWORD', type=str,
+ help='password of specified user')
+ parser.add_argument('database', metavar='DATABASE', type=str,
+ help='which database to connect to')
+ parser.add_argument('count', metavar='COUNT', type=int,
+ help='how many random records to insert into the database')
+ args = parser.parse_args()
+
+
+ # get variabless from command line
+ hostname = args.hostname
+ port = args.port
+ username = args.username
+ password = args.password
+ database = args.database
+ count = args.count
+
+
+ # create db connection
+ # print("----------------ERROR-------------")
+ # print("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database))
+ engine = create_engine("mysql+pymysql://{}:{}@{}:{}/{}".format(username, password, hostname, port, database), echo=True)
+
+
+ # create table if not exist
+ Base.metadata.create_all(engine)
+
+ # create a session
+ Session = sessionmaker(bind=engine)
+ session = Session()
+
+ for i in range(0, count):
+ random_record = JdbcCassandra(
+ ascii_type = random_string(30),
+ boolean_type = (i % 2 == 0),
+ date_type = datetime.datetime.now().date(),
+ decimal_type = 10.5,
+ double_type = 1.5,
+ float_type = 8.3,
+ inet_type = "127.0.0.1",
+ smallint_type = 1,
+ time_type = datetime.datetime.now().time(),
+ text_type = random_string(30),
+ timestamp_type = datetime.datetime.now(),
+ #timeuuid_type = str(uuid.uuid1()),
+ tinyint_type = 1,
+ #uuid_type = str(uuid.uuid1()),
+ varchar_type = random_string(30),
+ varint_type = random.getrandbits(63),
+ )
+ session.add(random_record)
+
+ session.commit()
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/scripts/requirements.txt b/scripts/requirements.txt
new file mode 100644
index 0000000..fc7dc31
--- /dev/null
+++ b/scripts/requirements.txt
@@ -0,0 +1,2 @@
+SQLAlchemy==1.3.16
+PyMySQL==0.9.3
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
new file mode 100644
index 0000000..c860750
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/CloneUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class CloneUtils {
+ @SuppressWarnings("unchecked")
+ public static <T extends Serializable> T clone(T obj) {
+ T clonedObj = null;
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ oos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ clonedObj = (T) ois.readObject();
+ ois.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return clonedObj;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
new file mode 100644
index 0000000..462add2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/ConstDefine.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public class ConstDefine {
+
+ public static String CASSANDRA_CONNECTOR_ADMIN_PREFIX = "CASSANDRA-CONNECTOR-ADMIN";
+ public static final String PREFIX = "cassandra";
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
new file mode 100644
index 0000000..bd58eea
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DBUtils.java
@@ -0,0 +1,91 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.connector.CassandraSinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
+
+public class DBUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+ public static CqlSession initCqlSession(Config config) throws Exception {
+ log.info("Trying to init Cql Session ");
+ Map<String, String> map = new HashMap<>();
+
+ String dbUrl = config.getDbUrl();
+ String dbPort = config.getDbPort();
+ String localDataCenter = config.getLocalDataCenter();
+ String username = config.getDbUsername();
+ String password = config.getDbPassword();
+
+// sessionBuilder.addContactPoint(new InetSocketAddress(dbUrl, Integer.parseInt(dbPort)))
+// .withAuthCredentials(username, password);
+
+
+ log.info("Cassandra dbUrl: {}", dbUrl);
+ log.info("Cassandra dbPort: {}", dbPort);
+ log.info("Cassandra datacenter: {}", localDataCenter);
+ log.info("Cassandra username: {}", username);
+ log.info("Cassandra password: {}", password);
+
+ CqlSession cqlSession = null;
+ log.info("Using Program Config Loader");
+ try {
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ Future<CqlSession> handle = executorService.submit(new Callable<CqlSession>() {
+ @Override
+ public CqlSession call() {
+ return CqlSession.builder()
+ .addContactPoint(new InetSocketAddress(dbUrl, Integer.valueOf(dbPort)))
+ .withLocalDatacenter(localDataCenter)
+ .build();
+ }
+ });
+
+ cqlSession = handle.get();
+
+ } catch (Exception e) {
+ log.info("error when creating cqlSession {}", e.getMessage());
+ e.printStackTrace();
+ }
+ log.info("init Cql Session success");
+
+ return cqlSession;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
new file mode 100644
index 0000000..d6f814f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/DataType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.common;
+
+public enum DataType {
+
+ COMMON_MESSAGE,
+ TOPIC_CONFIG,
+ BROKER_CONFIG,
+ SUB_CONFIG,
+ OFFSET
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
new file mode 100644
index 0000000..0911e20
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/common/Utils.java
@@ -0,0 +1,76 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.common;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
+
+ public static String createGroupName(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+ }
+
+ public static String createTaskId(String prefix) {
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createInstanceName(String namesrvAddr) {
+ String[] namesrvArray = namesrvAddr.split(";");
+ List<String> namesrvList = new ArrayList<>();
+ for (String ns : namesrvArray) {
+ if (!namesrvList.contains(ns)) {
+ namesrvList.add(ns);
+ }
+ }
+ Collections.sort(namesrvList);
+ return String.valueOf(namesrvList.toString().hashCode());
+ }
+
+ public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+ String cluster) throws RemotingException, MQClientException, InterruptedException {
+ List<BrokerData> brokerList = new ArrayList<>();
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData.getBrokerDatas() != null) {
+ for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+ if (StringUtils.equals(broker.getCluster(), cluster)) {
+ brokerList.add(broker);
+ }
+ }
+ }
+ return brokerList;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
new file mode 100644
index 0000000..b9b115e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/Config.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class Config {
+ private static final Logger LOG = LoggerFactory.getLogger(Config.class);
+
+ /* Database Connection Config */
+ private String dbUrl;
+ private String dbPort;
+ private String localDataCenter;
+ private String dbUsername;
+ private String dbPassword;
+ private String dataType;
+ private String rocketmqTopic;
+
+ private List tableWhitelist;
+ private List tableBlacklist;
+ private String whiteDataBase;
+ private String whiteTable;
+
+
+ public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+ public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
+ public static final String CONN_WHITE_LIST = "whiteDataBase";
+ public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+ public static final String CONN_DB_IP = "dbUrl";
+ public static final String CONN_DB_PORT = "dbPort";
+ public static final String CONN_DB_USERNAME = "dbUsername";
+ public static final String CONN_DB_PASSWORD = "dbPassword";
+ public static final String CONN_DB_DATACENTER = "localDataCenter";
+ public static final String CONN_DATA_TYPE = "dataType";
+ public static final String CONN_TOPIC_NAMES = "topicNames";
+ public static final String CONN_DB_MODE = "mode";
+
+ public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+ public static final String CONN_SOURCE_CLUSTER = "source-cluster";
+ public static final String REFRESH_INTERVAL = "refresh.interval";
+
+ /* Mode Config */
+ private String mode = "";
+ private String incrementingColumnName = "";
+ private String query = "";
+ private String timestampColmnName = "";
+ private boolean validateNonNull = true;
+
+ /*Connector config*/
+ private String tableTypes = "table";
+ private long pollInterval = 5000;
+ private int batchMaxRows = 100;
+ private long tablePollInterval = 60000;
+ private long timestampDelayInterval = 0;
+ private String dbTimezone = "GMT+8";
+ private String queueName;
+
+ private Logger log = LoggerFactory.getLogger(Config.class);
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add("dbUrl");
+ add("dbPort");
+ add("localDataCenter");
+ add("dbUsername");
+ add("dbPassword");
+ add("mode");
+ add("rocketmqTopic");
+ }
+ };
+
+
+
+ public static Logger getLOG() {
+ return LOG;
+ }
+
+ public String getDbUrl() { return dbUrl; }
+
+ public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; }
+
+ public String getDbPort() { return dbPort; }
+
+ public void setDbPort(String dbPort) { this.dbPort = dbPort; }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getDbUsername() {
+ return dbUsername;
+ }
+
+ public void setDbUsername(String dbUsername) {
+ this.dbUsername = dbUsername;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(String dataType) {
+ this.dataType = dataType;
+ }
+
+ public String getRocketmqTopic() {
+ return rocketmqTopic;
+ }
+
+ public void setRocketmqTopic(String rocketmqTopic) {
+ this.rocketmqTopic = rocketmqTopic;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getIncrementingColumnName() {
+ return incrementingColumnName;
+ }
+
+ public void setIncrementingColumnName(String incrementingColumnName) {
+ this.incrementingColumnName = incrementingColumnName;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getTimestampColmnName() {
+ return timestampColmnName;
+ }
+
+ public void setTimestampColmnName(String timestampColmnName) {
+ this.timestampColmnName = timestampColmnName;
+ }
+
+ public boolean isValidateNonNull() {
+ return validateNonNull;
+ }
+
+ public void setValidateNonNull(boolean validateNonNull) {
+ this.validateNonNull = validateNonNull;
+ }
+
+ public String getTableTypes() {
+ return tableTypes;
+ }
+
+ public void setTableTypes(String tableTypes) {
+ this.tableTypes = tableTypes;
+ }
+
+ public long getPollInterval() {
+ return pollInterval;
+ }
+
+ public void setPollInterval(long pollInterval) {
+ this.pollInterval = pollInterval;
+ }
+
+ public int getBatchMaxRows() {
+ return batchMaxRows;
+ }
+
+ public void setBatchMaxRows(int batchMaxRows) {
+ this.batchMaxRows = batchMaxRows;
+ }
+
+ public long getTablePollInterval() {
+ return tablePollInterval;
+ }
+
+ public void setTablePollInterval(long tablePollInterval) {
+ this.tablePollInterval = tablePollInterval;
+ }
+
+ public long getTimestampDelayInterval() {
+ return timestampDelayInterval;
+ }
+
+ public void setTimestampDelayInterval(long timestampDelayInterval) {
+ this.timestampDelayInterval = timestampDelayInterval;
+ }
+
+ public String getDbTimezone() {
+ return dbTimezone;
+ }
+
+ public void setDbTimezone(String dbTimezone) {
+ this.dbTimezone = dbTimezone;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ public Logger getLog() {
+ return log;
+ }
+
+ public void setLog(Logger log) {
+ this.log = log;
+ }
+
+ public List getTableWhitelist() {
+ return tableWhitelist;
+ }
+
+ public void setTableWhitelist(List tableWhitelist) {
+ this.tableWhitelist = tableWhitelist;
+ }
+
+ public List getTableBlacklist() {
+ return tableBlacklist;
+ }
+
+ public void setTableBlacklist(List tableBlacklist) {
+ this.tableBlacklist = tableBlacklist;
+ }
+
+ public String getWhiteDataBase() {
+ return whiteDataBase;
+ }
+
+ public void setWhiteDataBase(String whiteDataBase) {
+ this.whiteDataBase = whiteDataBase;
+ }
+
+ public String getWhiteTable() {
+ return whiteTable;
+ }
+
+ public void setWhiteTable(String whiteTable) {
+ this.whiteTable = whiteTable;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
new file mode 100644
index 0000000..1c08fb2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+ public static <T> void load(KeyValue props, Object object) {
+
+ properties2Object(props, object);
+ }
+
+ private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
new file mode 100644
index 0000000..3dd25c0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/DbConnectorConfig.java
@@ -0,0 +1,110 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.TaskDivideStrategy;
+
+public abstract class DbConnectorConfig {
+ public TaskDivideStrategy taskDivideStrategy;
+ public String dbUrl;
+ public String dbPort;
+ public String dbUserName;
+ public String dbPassword;
+ public String localDataCenter;
+ public String converter;
+ public int taskParallelism;
+ public String mode;
+
+ public abstract void validate(KeyValue config);
+
+ public abstract <T> T getWhiteTopics();
+
+ public TaskDivideStrategy getTaskDivideStrategy() {
+ return taskDivideStrategy;
+ }
+
+ public void setTaskDivideStrategy(TaskDivideStrategy taskDivideStrategy) {
+ this.taskDivideStrategy = taskDivideStrategy;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getConverter() {
+ return converter;
+ }
+
+ public void setConverter(String converter) {
+ this.converter = converter;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
new file mode 100644
index 0000000..3145033
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SinkDbConnectorConfig.java
@@ -0,0 +1,112 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+public class SinkDbConnectorConfig extends DbConnectorConfig {
+ private Set<String> whiteList;
+ private String srcNamesrvs;
+ private String srcCluster;
+ private long refreshInterval;
+ private Map<String, Set<TaskTopicInfo>> topicRouteMap;
+
+ public SinkDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+ this.taskDivideStrategy = new DivideTaskByTopic();
+
+ buildWhiteList(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+ this.srcNamesrvs = config.getString(Config.CONN_SOURCE_RMQ);
+ this.srcCluster = config.getString(Config.CONN_SOURCE_CLUSTER);
+ this.refreshInterval = config.getLong(Config.REFRESH_INTERVAL, 3);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+ }
+
+ private void buildWhiteList(KeyValue config) {
+ this.whiteList = new HashSet<>();
+ String whiteListStr = config.getString(Config.CONN_TOPIC_NAMES, "");
+ String[] wl = whiteListStr.trim().split(",");
+ if (wl.length <= 0)
+ throw new IllegalArgumentException("White list must be not empty.");
+ else {
+ this.whiteList.clear();
+ for (String t : wl) {
+ this.whiteList.add(t.trim());
+ }
+ }
+ }
+
+
+ public Set<String> getWhiteList() {
+ return whiteList;
+ }
+
+ public void setWhiteList(Set<String> whiteList) {
+ this.whiteList = whiteList;
+ }
+
+ public String getSrcNamesrvs() {
+ return this.srcNamesrvs;
+ }
+
+ public String getSrcCluster() {
+ return this.srcCluster;
+ }
+
+ public long getRefreshInterval() {
+ return this.refreshInterval;
+ }
+
+ public Map<String, Set<TaskTopicInfo>> getTopicRouteMap() {
+ return topicRouteMap;
+ }
+
+ public void setTopicRouteMap(Map<String, Set<TaskTopicInfo>> topicRouteMap) {
+ this.topicRouteMap = topicRouteMap;
+ }
+
+ @Override
+ public Set<String> getWhiteTopics() {
+ return getWhiteList();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
new file mode 100644
index 0000000..6a3f685
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/SourceDbConnectorConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connect.cassandra.strategy.DivideTaskByTopic;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SourceDbConnectorConfig extends DbConnectorConfig{
+
+ private Map<String, String> whiteMap;
+
+ public SourceDbConnectorConfig(){
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(Config.CONN_TASK_PARALLELISM, 1);
+
+ int strategy = config.getInt(Config.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_TOPIC.ordinal());
+
+ this.taskDivideStrategy = new DivideTaskByTopic();
+
+ buildWhiteMap(config);
+
+ this.converter = config.getString(Config.CONN_SOURCE_RECORD_CONVERTER);
+ this.dbUrl = config.getString(Config.CONN_DB_IP);
+ this.dbPort = config.getString(Config.CONN_DB_PORT);
+ this.dbUserName = config.getString(Config.CONN_DB_USERNAME);
+ this.dbPassword = config.getString(Config.CONN_DB_PASSWORD);
+ this.localDataCenter = config.getString(Config.CONN_DB_DATACENTER);
+ this.mode = config.getString(Config.CONN_DB_MODE, "bulk");
+
+ }
+
+ private void buildWhiteMap(KeyValue config) {
+ this.whiteMap = new HashMap<>(16);
+ String whiteListStr = config.getString(Config.CONN_WHITE_LIST, "");
+ JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteListStr);
+ if(whiteDataBaseObject.keySet().size() <= 0){
+ throw new IllegalArgumentException("white data base must be not empty.");
+ }else {
+ this.whiteMap.clear();
+ for (String dbName : whiteDataBaseObject.keySet()){
+ JSONObject whiteTableObject = (JSONObject) whiteDataBaseObject.get(dbName);
+ for (String tableName : whiteTableObject.keySet()){
+ String dbTableKey = dbName + "-" + tableName;
+ this.whiteMap.put(dbTableKey, whiteTableObject.getString(tableName));
+ }
+ }
+ }
+ }
+
+
+ public Map<String, String> getWhiteMap() {
+ return whiteMap;
+ }
+
+ public void setWhiteMap(Map<String, String> whiteMap) {
+ this.whiteMap = whiteMap;
+ }
+
+ @Override
+ public Map<String, String> getWhiteTopics() {
+ return getWhiteMap();
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
new file mode 100644
index 0000000..7c43137
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskDivideConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.config;
+
+public class TaskDivideConfig {
+
+ private String dbUrl;
+
+ private String dbPort;
+
+ private String dbUserName;
+
+ private String dbPassword;
+
+ private String localDataCenter;
+
+ private String srcRecordConverter;
+
+ private int dataType;
+
+ private int taskParallelism;
+
+ private String mode;
+
+ public TaskDivideConfig(String dbUrl, String dbPort, String dbUserName, String dbPassword, String localDataCenter,
+ String srcRecordConverter, int dataType, int taskParallelism, String mode) {
+ this.dbUrl = dbUrl;
+ this.dbPort = dbPort;
+ this.dbUserName = dbUserName;
+ this.dbPassword = dbPassword;
+ this.localDataCenter = localDataCenter;
+ this.srcRecordConverter = srcRecordConverter;
+ this.dataType = dataType;
+ this.taskParallelism = taskParallelism;
+ this.mode = mode;
+ }
+
+ public String getDbUrl() {
+ return dbUrl;
+ }
+
+ public void setDbUrl(String dbUrl) {
+ this.dbUrl = dbUrl;
+ }
+
+ public String getDbPort() {
+ return dbPort;
+ }
+
+ public void setDbPort(String dbPort) {
+ this.dbPort = dbPort;
+ }
+
+ public String getDbUserName() {
+ return dbUserName;
+ }
+
+ public void setDbUserName(String dbUserName) {
+ this.dbUserName = dbUserName;
+ }
+
+ public String getDbPassword() {
+ return dbPassword;
+ }
+
+ public void setDbPassword(String dbPassword) {
+ this.dbPassword = dbPassword;
+ }
+
+ public String getLocalDataCenter() {
+ return localDataCenter;
+ }
+
+ public void setLocalDataCenter(String localDataCenter) {
+ this.localDataCenter = localDataCenter;
+ }
+
+ public String getSrcRecordConverter() {
+ return srcRecordConverter;
+ }
+
+ public void setSrcRecordConverter(String srcRecordConverter) {
+ this.srcRecordConverter = srcRecordConverter;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
+ }
+
+ public int getTaskParallelism() {
+ return taskParallelism;
+ }
+
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ public void setMode(String mode) {
+ this.mode = mode;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
new file mode 100644
index 0000000..074faab
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/config/TaskTopicInfo.java
@@ -0,0 +1,40 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.config;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class TaskTopicInfo extends MessageQueue {
+
+ private String targetTopic;
+
+ public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+ super(sourceTopic, brokerName, queueId);
+ this.targetTopic = targetTopic;
+ }
+
+ public String getTargetTopic() {
+ return this.targetTopic;
+ }
+
+ public void setTargetTopic(String targetTopic) {
+ this.targetTopic = targetTopic;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
new file mode 100644
index 0000000..6ce23f6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkConnector.java
@@ -0,0 +1,240 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.sink.SinkConnector;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connect.cassandra.common.CloneUtils;
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.common.Utils;
+import org.apache.rocketmq.connect.cassandra.config.*;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class CassandraSinkConnector extends SinkConnector{
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkConnector.class);
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+ private ScheduledExecutorService executor;
+ private HashMap<String, Set<TaskTopicInfo>> topicRouteMap;
+
+ private DefaultMQAdminExt srcMQAdminExt;
+
+ private volatile boolean adminStarted;
+
+ private ScheduledFuture<?> listenerHandle;
+
+ public CassandraSinkConnector() {
+ topicRouteMap = new HashMap<>();
+ dbConnectorConfig = new SinkDbConnectorConfig();
+ executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("CassandraSinkConnector-SinkWatcher-%d").daemon(true).build());
+ }
+
+ private synchronized void startMQAdminTools() {
+ if (!configValid || adminStarted) {
+ return;
+ }
+ RPCHook rpcHook = null;
+ this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.srcMQAdminExt.setNamesrvAddr(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs());
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.CASSANDRA_CONNECTOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcNamesrvs()));
+
+ try {
+ log.info("Trying to start srcMQAdminExt");
+ this.srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+
+ } catch (MQClientException e) {
+ log.error("Cassandra Sink Task start failed for `srcMQAdminExt` exception.", e);
+ }
+
+ adminStarted = true;
+ }
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
+
+ return "";
+ }
+
+ @Override
+ public void start() {
+ startMQAdminTools();
+ startListener();
+ }
+
+ public void startListener() {
+ listenerHandle = executor.scheduleAtFixedRate(new Runnable() {
+ boolean first = true;
+ HashMap<String, Set<TaskTopicInfo>> origin = null;
+
+ @Override
+ public void run() {
+ buildRoute();
+ if (first) {
+ origin = CloneUtils.clone(topicRouteMap);
+ first = false;
+ }
+ if (!compare(origin, topicRouteMap)) {
+ context.requestTaskReconfiguration();
+ origin = CloneUtils.clone(topicRouteMap);
+ }
+ }
+ }, ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), ((SinkDbConnectorConfig) dbConnectorConfig).getRefreshInterval(), TimeUnit.SECONDS);
+ }
+
+ public boolean compare(Map<String, Set<TaskTopicInfo>> origin, Map<String, Set<TaskTopicInfo>> updated) {
+ if (origin.size() != updated.size()) {
+ return false;
+ }
+ for (Map.Entry<String, Set<TaskTopicInfo>> entry : origin.entrySet()) {
+ if (!updated.containsKey(entry.getKey())) {
+ return false;
+ }
+ Set<TaskTopicInfo> originTasks = entry.getValue();
+ Set<TaskTopicInfo> updateTasks = updated.get(entry.getKey());
+ if (originTasks.size() != updateTasks.size()) {
+ return false;
+ }
+
+ if (!originTasks.containsAll(updateTasks)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void buildRoute() {
+ String srcCluster = ((SinkDbConnectorConfig) this.dbConnectorConfig).getSrcCluster();
+ try {
+ for (String topic : ((SinkDbConnectorConfig) this.dbConnectorConfig).getWhiteList()) {
+
+ // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+ // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+ // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+ List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+ Set<String> brokerNameSet = new HashSet<String>();
+ for (BrokerData b : brokerList) {
+ brokerNameSet.add(b.getBrokerName());
+ }
+
+ TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new HashSet<>(16));
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ if (brokerNameSet.contains(qd.getBrokerName())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, null);
+ topicRouteMap.get(topic).add(taskTopicInfo);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
+ } finally {
+ // srcMQAdminExt.shutdown();
+ }
+ }
+
+
+ /**
+ * We need to reason why we don't call srcMQAdminExt.shutdown() here, and why
+ * it can be applied to srcMQAdminExt
+ */
+ @Override
+ public void stop() {
+ listenerHandle.cancel(true);
+ // srcMQAdminExt.shutdown();
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return CassandraSinkTask.class;
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs() {
+ log.info("List.start");
+ if (!configValid) {
+ return new ArrayList<KeyValue>();
+ }
+
+ startMQAdminTools();
+
+ buildRoute();
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getLocalDataCenter(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+
+ ((SinkDbConnectorConfig) this.dbConnectorConfig).setTopicRouteMap(topicRouteMap);
+
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
new file mode 100644
index 0000000..a8e9b0a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSinkTask.java
@@ -0,0 +1,161 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSONObject;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.common.QueueMetaData;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SinkDataEntry;
+import io.openmessaging.connector.api.sink.SinkTask;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.sink.Updater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * In the naming, we are using database for "keyspaces" and table for "columnFamily"
+ * This is because we kind of want the abstract data source to be aligned with SQL databases
+ */
+public class CassandraSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(CassandraSinkTask.class);
+
+ private Config config;
+
+ private CqlSession cqlSession;
+ private Updater updater;
+ private BlockingQueue<Updater> tableQueue = new LinkedBlockingQueue<Updater>();
+
+ public CassandraSinkTask() {
+ this.config = new Config();
+ }
+
+ @Override
+ public void put(Collection<SinkDataEntry> sinkDataEntries) {
+ try {
+ if (tableQueue.size() > 1) {
+ updater = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+ } else {
+ updater = tableQueue.peek();
+ }
+ log.info("Cassandra Sink Task trying to put()");
+ for (SinkDataEntry record : sinkDataEntries) {
+ Map<Field, Object[]> fieldMap = new HashMap<>();
+ Object[] payloads = record.getPayload();
+ Schema schema = record.getSchema();
+ EntryType entryType = record.getEntryType();
+ String cfName = schema.getName();
+ String keyspaceName = schema.getDataSource();
+ List<Field> fields = schema.getFields();
+ Boolean parseError = false;
+ if (!fields.isEmpty()) {
+ for (Field field : fields) {
+ Object fieldValue = payloads[field.getIndex()];
+ Object[] value = JSONObject.parseArray((String)fieldValue).toArray();
+ if (value.length == 2) {
+ fieldMap.put(field, value);
+ } else {
+ log.error("parseArray error, fieldValue:{}", fieldValue);
+ parseError = true;
+ }
+ }
+ }
+ if (!parseError) {
+ log.info("Cassandra Sink Task trying to call updater.push()");
+ Boolean isSuccess = updater.push(keyspaceName, cfName, fieldMap, entryType);
+ if (!isSuccess) {
+ log.error("push data error, keyspaceName:{}, cfName:{}, entryType:{}, fieldMap:{}", keyspaceName, cfName, fieldMap, entryType);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("put sinkDataEntries error, {}", e);
+ }
+ }
+
+ @Override
+ public void commit(Map<QueueMetaData, Long> map) {
+
+ }
+
+ /**
+ * Remember always close the CqlSession according to
+ * https://docs.datastax.com/en/developer/java-driver/4.5/manual/core/
+ * @param props
+ */
+ @Override
+ public void start(KeyValue props) {
+ try {
+ ConfigUtil.load(props, this.config);
+ cqlSession = DBUtils.initCqlSession(config);
+ log.info("init data source success");
+ } catch (Exception e) {
+ log.error("Cannot start Cassandra Sink Task because of configuration error{}", e);
+ }
+ String mode = config.getMode();
+ if (mode.equals("bulk")) {
+ Updater updater = new Updater(config, cqlSession);
+ try {
+ updater.start();
+ tableQueue.add(updater);
+ } catch (Exception e) {
+ log.error("fail to start updater{}", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (cqlSession != null){
+ cqlSession.close();
+ log.info("cassandra sink task connection is closed.");
+ }
+ } catch (Throwable e) {
+ log.warn("sink task stop error while closing connection to {}", "cassandra", e);
+ }
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
new file mode 100644
index 0000000..a8adc74
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceConnector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.connect.cassandra.common.DataType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.SourceDbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceConnector extends SourceConnector {
+ private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceConnector.class);
+ private DbConnectorConfig dbConnectorConfig;
+ private volatile boolean configValid = false;
+
+ public CassandraSourceConnector() {
+ dbConnectorConfig = new SourceDbConnectorConfig();
+ }
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+
+ log.info("CassandraSourceConnector verifyAndSetConfig enter");
+ for (String requestKey : Config.REQUEST_CONFIG) {
+
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ try {
+ this.dbConnectorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+ this.configValid = true;
+
+ return "";
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return CassandraSourceTask.class;
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs() {
+ log.info("List.start");
+ if (!configValid) {
+ return new ArrayList<KeyValue>();
+ }
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.dbConnectorConfig.getDbUrl(),
+ this.dbConnectorConfig.getDbPort(),
+ this.dbConnectorConfig.getDbUserName(),
+ this.dbConnectorConfig.getDbPassword(),
+ this.dbConnectorConfig.getLocalDataCenter(),
+ this.dbConnectorConfig.getConverter(),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.dbConnectorConfig.getTaskParallelism(),
+ this.dbConnectorConfig.getMode()
+ );
+ return this.dbConnectorConfig.getTaskDivideStrategy().divide(this.dbConnectorConfig, tdc);
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
new file mode 100644
index 0000000..cac44ed
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/connector/CassandraSourceTask.java
@@ -0,0 +1,168 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.datastax.oss.driver.api.core.CqlSession;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
+import org.apache.rocketmq.connect.cassandra.common.DBUtils;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.apache.rocketmq.connect.cassandra.source.Querier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
+
+ private Config config;
+
+ private DataSource dataSource;
+
+ private CqlSession cqlSession;
+
+ BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
+ static final String INCREMENTING_FIELD = "incrementing";
+ static final String TIMESTAMP_FIELD = "timestamp";
+ private Querier querier;
+
+ public CassandraSourceTask() {
+ this.config = new Config();
+ }
+
+ @Override
+ public Collection<SourceDataEntry> poll() {
+ List<SourceDataEntry> res = new ArrayList<>();
+ try {
+ if (tableQueue.size() > 1)
+ querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
+ else
+ querier = tableQueue.peek();
+ Timer timer = new Timer();
+ try {
+ Thread.currentThread();
+ Thread.sleep(1000);//毫秒
+ } catch (Exception e) {
+ throw e;
+ }
+ querier.poll();
+ for (Table dataRow : querier.getList()) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("nextQuery", "database");
+ jsonObject.put("nextPosition", "table");
+ Schema schema = new Schema();
+ schema.setDataSource(dataRow.getDatabase());
+ schema.setName(dataRow.getName());
+ schema.setFields(new ArrayList<>());
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
+ String columnName = dataRow.getColList().get(i);
+ String rawDataType = dataRow.getRawDataTypeList().get(i);
+ Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+ schema.getFields().add(field);
+ }
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
+ .entryType(EntryType.UPDATE);
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
+ Object[] value = new Object[2];
+ value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
+ dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
+ }
+
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
+ res.add(sourceDataEntry);
+ log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
+ }
+ } catch (Exception e) {
+ log.error("Cassandra task poll error, current config:" + JSON.toJSONString(config), e);
+ }
+ log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res));
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue props) {
+ try {
+ ConfigUtil.load(props, this.config);
+ cqlSession = DBUtils.initCqlSession(config);
+ log.info("init data source success");
+ } catch (Exception e) {
+ log.error("Cannot start Cassandra Source Task because of configuration error{}", e);
+ }
+ Map<Map<String, String>, Map<String, Object>> offsets = null;
+ String mode = config.getMode();
+ if (mode.equals("bulk")) {
+ Querier querier = new Querier(config, cqlSession);
+ try {
+ querier.start();
+ tableQueue.add(querier);
+ } catch (Exception e) {
+ log.error("start querier failed in bulk mode{}", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (cqlSession != null) {
+ cqlSession.close();
+ log.info("Cassandra source task connection is closed.");
+ }
+ } catch (Throwable e) {
+ log.warn("source task stop error while closing connection to {}", "Cassandra", e);
+ }
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
new file mode 100644
index 0000000..c8f69e6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Database.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class Database {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Database.class);
+
+ private static final String CQL = "SELECT * FROM system_schema.columns \n" +
+ "WHERE keyspace_name = 'keyspace_name' AND table_name = 'table_name'";
+
+ private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+ private static final String TABLE_COLUMNS = "columns";
+
+ private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+ private static final String COLUMN_TABLE_NAME = "table_name";
+
+ private static final String COLUMN_COLUMN_NAME = "column_name";
+
+ private static final String COLUMN_TYPE = "type";
+
+ private static final String GENERAL_CHARSET = "utf-8";
+
+ private String name;
+
+ private CqlSession cqlSession;
+
+ private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+ public Set<String> tableWhiteList;
+
+ public Map<String, Map<String, String>> tableFilterMap;
+
+ public Database(String name, CqlSession cqlSession, Set<String> tableWhiteList, Map<String, Map<String, String>> tableFilterMap) {
+ this.name = name;
+ this.cqlSession = cqlSession;
+ this.tableWhiteList = tableWhiteList;
+ this.tableFilterMap = tableFilterMap;
+
+ }
+
+ public void init(){
+ Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_COLUMNS)
+ .all()
+ .whereColumn(COLUMN_KEYSPACE_NAME)
+ .isEqualTo(QueryBuilder.literal(name));
+
+
+ SimpleStatement stmt;
+ boolean finishUpdate = false;
+ LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+ ResultSet result = null;
+ try {
+ while (!cqlSession.isClosed() && !finishUpdate){
+ stmt = selectFrom.build();
+ result = cqlSession.execute(stmt);
+ if (result.wasApplied()) {
+ LOGGER.info("query columns success, executed cql query {}", selectFrom.asCql());
+ }
+ finishUpdate = true;
+ }
+
+ for (Row row : result) {
+ String tableName = row.getString(COLUMN_TABLE_NAME);
+ String columnName = row.getString(COLUMN_COLUMN_NAME);
+ String columnType = row.getString(COLUMN_TYPE);
+
+
+
+ ColumnParser columnParser = ColumnParser.getColumnParser(columnType, columnType, GENERAL_CHARSET);
+
+ if (!tableWhiteList.contains(tableName)){
+ continue;
+ }
+ if (!tableMap.containsKey(tableName)) {
+ addTable(tableName);
+ }
+ Table table = tableMap.get(tableName);
+ table.addCol(columnName);
+ table.addParser(columnParser);
+ table.addRawDataType(columnType);
+ table.setFilterMap(tableFilterMap.get(tableName));
+ }
+ } catch (Exception e) {
+ LOGGER.error("init cassandra Schema failure,{}", e);
+ }
+
+ }
+
+ private void addTable(String tableName) {
+
+ LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+ Table table = new Table(name, tableName);
+ tableMap.put(tableName, table);
+ }
+
+ public Table getTable(String tableName) {
+
+ return tableMap.get(tableName);
+ }
+
+ public Map<String, Table> getTableMap() {
+ return tableMap;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
new file mode 100644
index 0000000..01054e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Schema.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+
+public class Schema {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
+
+ private static final String CQL = "SELECT * FROM system_schema.keyspaces";
+
+ private static final String SCHEMA_SYSTEM_SCHEMA = "system_schema";
+
+ private static final String TABLE_KEYSPACES = "keyspaces";
+
+ private static final String COLUMN_KEYSPACE_NAME = "keyspace_name";
+
+ private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+ Arrays.asList(new String[] {"system_schema", "system", "system_auth", "system_distributed", "system_traces"})
+ );
+
+ private CqlSession cqlSession;
+
+ private Map<String, Database> dbMap;
+
+ public Map<String, Set<String>> dbTableMap;
+
+ public Map<String, Map<String, String>> tableFilterMap;
+
+ public Schema(CqlSession cqlSession) {
+ this.cqlSession = cqlSession;
+ this.dbTableMap = new HashMap<>();
+ this.tableFilterMap = new HashMap<>();
+ }
+
+ public void load() throws Exception {
+
+ dbMap = new HashMap<>();
+
+ Select selectFrom = QueryBuilder.selectFrom(SCHEMA_SYSTEM_SCHEMA, TABLE_KEYSPACES)
+ .column(COLUMN_KEYSPACE_NAME);
+
+
+ SimpleStatement stmt;
+ boolean finishUpdate = false;
+ LOGGER.info("trying to execute sql query,{}", selectFrom.asCql());
+ ResultSet result = null;
+ try {
+ while (!cqlSession.isClosed() && !finishUpdate){
+ stmt = selectFrom.build();
+ result = cqlSession.execute(stmt);
+ if (result.wasApplied()) {
+ LOGGER.info("update table success, executed cql query {}", selectFrom.asCql());
+ }
+ finishUpdate = true;
+ }
+
+ for (Row row : result) {
+ String dbName = row.getString(COLUMN_KEYSPACE_NAME);
+ if (!IGNORED_DATABASES.contains(dbName) && dbTableMap.keySet().contains(dbName)) {
+ Database database = new Database(dbName, cqlSession, dbTableMap.get(dbName), tableFilterMap);
+ dbMap.put(dbName, database);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("init cassandra Schema failure,{}", e);
+ }
+
+ for (Database db : dbMap.values()) {
+ db.init();
+ }
+
+ }
+
+// public Table getTable(String dbName, String tableName) {
+//
+// if (dbMap == null) {
+// reload();
+// }
+//
+// Database database = dbMap.get(dbName);
+// if (database == null) {
+// return null;
+// }
+//
+// Table table = database.getTable(tableName);
+// if (table == null) {
+// return null;
+// }
+//
+// return table;
+// }
+
+ private void reload() {
+
+ while (true) {
+ try {
+ load();
+ break;
+ } catch (Exception e) {
+ LOGGER.error("Cassandra Source Connector reload schema error.", e);
+ }
+ }
+ }
+
+ public void reset() {
+ dbMap = null;
+ }
+
+ public Map<String, Database> getDbMap() {
+ return dbMap;
+ }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
new file mode 100644
index 0000000..902b797
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/Table.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.rocketmq.connect.cassandra.schema;
+
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class Table {
+
+ private String database;
+ private String name;
+ private List<String> colList = new LinkedList<>();
+ private List<ColumnParser> parserList = new LinkedList<>();
+ private List<String> rawDataTypeList = new LinkedList<>();
+ private List<Object> dataList = new LinkedList<>();
+ private Map<String, String> filterMap = new HashMap<>();
+
+ public Table(String database, String table) {
+ this.database = database;
+ this.name = table;
+ }
+
+ public void addCol(String column) {
+ colList.add(column);
+ }
+
+ public void setParserList(List<ColumnParser> parserList) {
+ this.parserList = parserList;
+ }
+
+ public void setRawDataTypeList(List<String> rawDataTypeList) {
+ this.rawDataTypeList = rawDataTypeList;
+ }
+
+ public void addParser(ColumnParser columnParser) {
+ parserList.add(columnParser);
+ }
+
+ public void addRawDataType(String rawDataType) {
+ this.rawDataTypeList.add(rawDataType);
+ }
+
+ public List<String> getColList() {
+ return colList;
+ }
+
+ public List<String> getRawDataTypeList() {
+ return rawDataTypeList;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<ColumnParser> getParserList() {
+ return parserList;
+ }
+
+ public List<Object> getDataList() {
+ return dataList;
+ }
+
+ public void setDataList(List<Object> dataList) {
+ this.dataList = dataList;
+ }
+
+ public void setColList(List<String> colList) {
+ this.colList = colList;
+ }
+
+ public Map<String, String> getFilterMap() {
+ return filterMap;
+ }
+
+ public void setFilterMap(Map<String, String> filterMap) {
+ this.filterMap = filterMap;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
new file mode 100644
index 0000000..0506469
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BigIntColumnParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.math.BigInteger;
+
+public class BigIntColumnParser extends ColumnParser {
+
+ private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+ private boolean signed;
+
+ public BigIntColumnParser(String colType) {
+ this.signed = !colType.matches(".* unsigned$");
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof BigInteger) {
+ return value;
+ }
+
+ Long l = (Long) value;
+ if (!signed && l < 0) {
+ return max.add(BigInteger.valueOf(l));
+ } else {
+ return l;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
new file mode 100644
index 0000000..8d5b2bb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/BooleanColumnParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class BooleanColumnParser extends ColumnParser{
+ @Override
+ public Object getValue(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Boolean) {
+ return value;
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
new file mode 100644
index 0000000..1eab587
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/ColumnParser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import io.openmessaging.connector.api.data.FieldType;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+
+
+ /**
+ * Currently this class implementation is not complete yet.
+ * @param dataType
+ * @param colType
+ * @param charset
+ * @return
+ */
+ public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+ switch (dataType) {
+ case "tinyint":
+ case "smallint":
+ case "mediumint":
+ case "varint":
+ case "int":
+ return new IntColumnParser(dataType, colType);
+ case "bigint":
+ return new BigIntColumnParser(colType);
+ case "tinytext":
+ case "text":
+ case "mediumtext":
+ case "longtext":
+ case "varchar":
+ case "ascii":
+ case "char":
+ return new StringColumnParser(charset);
+ case "date":
+ case "datetime":
+ case "timestamp":
+ return new DateTimeColumnParser();
+ case "time":
+ return new TimeColumnParser();
+ case "year":
+ return new YearColumnParser();
+ case "enum":
+ return new EnumColumnParser(colType);
+ case "set":
+ return new SetColumnParser(colType);
+ case "boolean":
+ return new BooleanColumnParser();
+ default:
+ return new DefaultColumnParser();
+ }
+ }
+
+ public static FieldType mapConnectorFieldType(String dataType) {
+
+ switch (dataType) {
+ case "tinyint":
+ case "smallint":
+ case "mediumint":
+ case "int":
+ return FieldType.INT32;
+ case "bigint":
+ return FieldType.BIG_INTEGER;
+ case "tinytext":
+ case "text":
+ case "mediumtext":
+ case "longtext":
+ case "varchar":
+ case "char":
+ return FieldType.STRING;
+ case "date":
+ case "datetime":
+ case "timestamp":
+ case "time":
+ case "year":
+ return FieldType.DATETIME;
+ case "enum":
+ return null;
+ case "set":
+ return null;
+ default:
+ return FieldType.BYTES;
+ }
+ }
+
+ public static String[] extractEnumValues(String colType) {
+ String[] enumValues = {};
+ Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+ if (matcher.matches()) {
+ enumValues = matcher.group(2).replace("'", "").split(",");
+ }
+
+ return enumValues;
+ }
+
+ public abstract Object getValue(Object value);
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..b110a19
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+ private static SimpleDateFormat dateTimeFormat;
+ private static SimpleDateFormat dateTimeUtcFormat;
+
+ static {
+ dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("GMT+8"));
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Timestamp) {
+ return dateTimeFormat.format(value);
+ }
+
+ if (value instanceof Long) {
+ return dateTimeUtcFormat.format(new Date((Long) value));
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
new file mode 100644
index 0000000..01d8d1a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/DefaultColumnParser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class DefaultColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof byte[]) {
+ return Base64.encodeBase64String((byte[]) value);
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
new file mode 100644
index 0000000..245dfd6
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/EnumColumnParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class EnumColumnParser extends ColumnParser {
+
+ private String[] enumValues;
+
+ public EnumColumnParser(String colType) {
+ enumValues = extractEnumValues(colType);
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ Integer i = (Integer) value;
+ if (i == 0) {
+ return null;
+ } else {
+ return enumValues[i - 1];
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..2257682
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+ private int bits;
+ private boolean signed;
+
+ public IntColumnParser(String dataType, String colType) {
+
+ switch (dataType) {
+ case "tinyint":
+ bits = 8;
+ break;
+ case "smallint":
+ bits = 16;
+ break;
+ case "mediumint":
+ bits = 24;
+ break;
+ case "int":
+ bits = 32;
+ }
+
+ this.signed = !colType.matches(".* unsigned$");
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Long) {
+ return value;
+ }
+
+ if (value instanceof Integer) {
+ Integer i = (Integer) value;
+ if (signed || i > 0) {
+ return i;
+ } else {
+ return (1L << bits) + i;
+ }
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..eaa6dad
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+ private String[] enumValues;
+
+ public SetColumnParser(String colType) {
+ enumValues = extractEnumValues(colType);
+ }
+
+ @Override
+ public Object getValue(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ StringBuilder builder = new StringBuilder();
+ long l = (Long) value;
+
+ boolean needSplit = false;
+ for (int i = 0; i < enumValues.length; i++) {
+ if (((l >> i) & 1) == 1) {
+ if (needSplit)
+ builder.append(",");
+
+ builder.append(enumValues[i]);
+ needSplit = true;
+ }
+ }
+
+ return builder.toString();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..2bd7a36
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+ private String charset;
+
+ public StringColumnParser(String charset) {
+ this.charset = charset.toLowerCase();
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ byte[] bytes = (byte[]) value;
+
+ switch (charset) {
+ case "utf8":
+ case "utf8mb4":
+ return new String(bytes, Charsets.UTF_8);
+ case "latin1":
+ case "ascii":
+ return new String(bytes, Charsets.ISO_8859_1);
+ case "ucs2":
+ return new String(bytes, Charsets.UTF_16);
+ default:
+ return new String(bytes, Charsets.toCharset(charset));
+
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
new file mode 100644
index 0000000..c812a53
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class TimeColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Timestamp) {
+
+ return new Time(((Timestamp) value).getTime());
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
new file mode 100644
index 0000000..82d61a8
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/YearColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.schema.column;
+
+import java.sql.Date;
+import java.util.Calendar;
+
+public class YearColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Date) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime((Date) value);
+ return calendar.get(Calendar.YEAR);
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
new file mode 100644
index 0000000..1c3f2c1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/sink/Updater.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.sink;
+
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.delete.Delete;
+import com.datastax.oss.driver.api.querybuilder.delete.DeleteSelection;
+import com.datastax.oss.driver.api.querybuilder.insert.InsertInto;
+import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
+import com.datastax.oss.driver.api.querybuilder.term.Term;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Updater {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Queue<CqlSession> connections = new ConcurrentLinkedQueue<>();
+ private Config config;
+ private CqlSession cqlSession;
+
+ private static final int BEFORE_UPDATE = 0;
+ private static final int AFTER_UPDATE = 1;
+
+ public Updater(Config config, CqlSession cqlSession) {
+ this.config = config;
+ this.cqlSession = cqlSession;
+ }
+
+ /**
+ * We cannot know the primary key of each table, so we have to put every field in the where clause
+ * @param dbName
+ * @param tableName
+ * @param fieldMap
+ * @param entryType
+ * @return
+ */
+ public boolean push(String dbName, String tableName, Map<Field, Object[]> fieldMap, EntryType entryType) {
+ log.info("Updater Trying to push data");
+ Boolean isSuccess = false;
+ boolean afterUpdateExist;
+ boolean beforeUpdateExist;
+ switch (entryType) {
+ case CREATE:
+ isSuccess = updateRow(dbName, tableName, fieldMap);
+ break;
+ case UPDATE:
+ isSuccess = updateRow(dbName, tableName, fieldMap);
+ break;
+ case DELETE:
+ isSuccess = deleteRow(dbName, tableName, fieldMap);
+ break;
+ default:
+ log.error("entryType {} is illegal.", entryType.toString());
+ }
+ return isSuccess;
+ }
+
+ public void start() throws Exception {
+ log.info("schema load success");
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+
+ /** Since we have no way of getting the id of a record, and we cannot get the primary key list of a table,
+ * even we can it is not extensible. So we the result sql sentense would be like
+ * UPDATE dbName.tableName SET afterUpdateValues WHERE beforeUpdateValues.
+ *
+ */
+ private Boolean updateRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+ log.info("Updater.updateRow() get called ");
+ int count = 0;
+ InsertInto insert = QueryBuilder.insertInto(dbName, tableName);
+ RegularInsert regularInsert = null;
+ for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+ count++;
+ String fieldName = entry.getKey().getName();
+ FieldType fieldType = entry.getKey().getType();
+ Object fieldValue = entry.getValue()[1];
+ if (count == 1) {
+ regularInsert = insert.value(fieldName, buildTerm(fieldType, fieldValue));
+ }
+ else {
+ regularInsert = regularInsert.value(fieldName, buildTerm(fieldType, fieldValue));
+ }
+ }
+
+
+ SimpleStatement stmt;
+ boolean finishUpdate = false;
+ log.info("trying to execute sql query,{}", regularInsert.asCql());
+ try {
+ while (!cqlSession.isClosed() && !finishUpdate){
+ stmt = regularInsert.build();
+ ResultSet result = cqlSession.execute(stmt);
+ if (result.wasApplied()) {
+ log.info("update table success, executed cql query {}", regularInsert.asCql());
+ return true;
+ }
+ finishUpdate = true;
+ }
+ } catch (Exception e) {
+ log.error("update table error,{}", e);
+ }
+ return false;
+ }
+
+
+ private boolean deleteRow(String dbName, String tableName, Map<Field, Object[]> fieldMap) {
+ DeleteSelection deleteSelection = QueryBuilder.deleteFrom(dbName, tableName);
+ Delete delete = null;
+ int count = 0;
+ for (Map.Entry<Field, Object[]> entry : fieldMap.entrySet()) {
+ count++;
+ String fieldName = entry.getKey().getName();
+ FieldType fieldType = entry.getKey().getType();
+ Object fieldValue = entry.getValue()[1];
+ if (count == 1) {
+ delete = deleteSelection.whereColumn(fieldName)
+ .isEqualTo(buildTerm(fieldType, fieldValue));
+ }
+ else {
+ delete = delete.whereColumn(fieldName)
+ .isEqualTo(buildTerm(fieldType, fieldValue));
+ }
+ }
+
+ boolean finishDelete = false;
+ SimpleStatement stmt = delete.build();
+ try {
+ while (!cqlSession.isClosed() && !finishDelete){
+ ResultSet result = cqlSession.execute(stmt);
+ if (result.wasApplied()) {
+ log.info("delete from table success, executed query {}", delete);
+ return true;
+ }
+ finishDelete = true;
+ }
+ } catch (Exception e) {
+ log.error("delete from table error,{}", e);
+ }
+ return false;
+ }
+
+
+ /**
+ * Cassandra datastax driver automatically
+ * infer type from literal value, or we can use "typeHints" to
+ * tell datastax driver what type should this literal be. We will add
+ * type hints utils once we found it is necessary to do so. For now literal
+ * inference should be enough.
+ *
+ * @param fieldType
+ * @param fieldValue
+ * @return
+ */
+ private Term buildTerm(FieldType fieldType, Object fieldValue) {
+ return QueryBuilder.literal(fieldValue);
+ }
+ private DataType typeParser(FieldType fieldType) {
+ DataType dataType = null;
+ switch (fieldType) {
+ case STRING:
+ break;
+ case DATETIME:
+ break;
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BIG_INTEGER:
+ dataType = DataTypes.BIGINT;
+ break;
+ default:
+ log.error("fieldType {} is illegal.", fieldType.toString());
+ }
+ return dataType;
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
new file mode 100644
index 0000000..8a8ddf4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/source/Querier.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.connect.cassandra.source;
+
+import com.alibaba.fastjson.JSONObject;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+import com.datastax.oss.driver.api.querybuilder.select.SelectFrom;
+import org.apache.rocketmq.connect.cassandra.config.Config;
+import org.apache.rocketmq.connect.cassandra.schema.Database;
+import org.apache.rocketmq.connect.cassandra.schema.Schema;
+import org.apache.rocketmq.connect.cassandra.schema.Table;
+import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Querier {
+
+ private final Logger log = LoggerFactory.getLogger(Querier.class); // use concrete subclass
+ protected String topicPrefix;
+ private final Queue<CqlSession> cqlSessions = new ConcurrentLinkedQueue<>();
+ private Config config;
+ private CqlSession cqlSession;
+ private List<Table> list = new LinkedList<>();
+ private String mode;
+ private Schema schema;
+
+ public Querier(){
+
+ }
+
+ public Querier(Config config, CqlSession cqlSession) {
+ this.config = config;
+ this.cqlSession = cqlSession;
+ this.schema = new Schema(cqlSession);
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public void setConfig(Config config) {
+ this.config = config;
+ }
+
+ public List<Table> getList() {
+ return list;
+ }
+
+
+ public void poll() {
+ try {
+ LinkedList<Table> tableLinkedList = new LinkedList<>();
+
+ for (Map.Entry<String, Database> entry : schema.getDbMap().entrySet()) {
+ String dbName = entry.getKey();
+ Iterator<Map.Entry<String, Table>> iterator = entry.getValue().getTableMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Table> tableEntry = iterator.next();
+ String tableName = tableEntry.getKey();
+ Table table = tableEntry.getValue();
+ Map<String, String> tableFilterMap = table.getFilterMap();
+
+
+ Select selectFrom = QueryBuilder.selectFrom(dbName, tableName).all();
+ if (tableFilterMap != null && !tableFilterMap.keySet().contains("NO-FILTER")){
+ for (String key: tableFilterMap.keySet()) {
+ String value = tableFilterMap.get(key);
+ selectFrom.whereColumn(key).isEqualTo(QueryBuilder.literal(value));
+ }
+ }
+
+
+ SimpleStatement stmt;
+ boolean finishUpdate = false;
+ log.info("trying to execute sql query,{}", selectFrom.asCql());
+ ResultSet result = null;
+ while (!cqlSession.isClosed() && !finishUpdate){
+ stmt = selectFrom.build();
+ result = cqlSession.execute(stmt);
+ if (result.wasApplied()) {
+ log.info("query columns success, executed cql query {}", selectFrom.asCql());
+ }
+ finishUpdate = true;
+ }
+
+ List<String> colList = tableEntry.getValue().getColList();
+ List<String> dataTypeList = tableEntry.getValue().getRawDataTypeList();
+ List<ColumnParser> parserList = tableEntry.getValue().getParserList();
+
+ for (Row row : result) {
+ Table tableWithData = new Table(dbName, tableName);
+ tableWithData.setColList(colList);
+ tableWithData.setRawDataTypeList(dataTypeList);
+ tableWithData.setParserList(parserList);
+
+ for (String col : colList) {
+ tableWithData.getDataList().add(row.getObject(col));
+ }
+ tableLinkedList.add(tableWithData);
+ }
+ }
+ }
+ list = tableLinkedList;
+ } catch (Exception e) {
+ log.error("fail to poll data, {}", e);
+ }
+
+ }
+
+ public void start() throws Exception {
+ String whiteDataBases = config.getWhiteDataBase();
+ JSONObject whiteDataBaseObject = JSONObject.parseObject(whiteDataBases);
+
+ if (whiteDataBaseObject != null){
+ for (String whiteDataBaseName : whiteDataBaseObject.keySet()){
+ JSONObject whiteTableObject = (JSONObject)whiteDataBaseObject.get(whiteDataBaseName);
+ HashSet<String> whiteTableSet = new HashSet<>();
+ for (String whiteTableName : whiteTableObject.keySet()){
+ Collections.addAll(whiteTableSet, whiteTableName);
+ HashMap<String, String> filterMap = new HashMap<>();
+ JSONObject tableFilterObject = JSONObject.parseObject(whiteTableObject.get(whiteTableName).toString());
+ for(String filterKey : tableFilterObject.keySet()){
+ filterMap.put(filterKey, tableFilterObject.getString(filterKey));
+ }
+ schema.tableFilterMap.put(whiteTableName, filterMap);
+ }
+ schema.dbTableMap.put(whiteDataBaseName, whiteTableSet);
+ }
+ }
+ schema.load();
+ log.info("load schema success");
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..f0eac2b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+public enum DivideStrategyEnum {
+
+ BY_TOPIC,
+ BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..68395d3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/DivideTaskByTopic.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.cassandra.config.*;
+
+
+import java.util.*;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+ @Override
+ public List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ if (dbConnectorConfig instanceof SourceDbConnectorConfig){
+ return divideSourceTaskByTopic(dbConnectorConfig, tdc);
+ }else {
+ return divideSinkTaskByTopic(dbConnectorConfig, tdc);
+ }
+ }
+
+ private List<KeyValue> divideSinkTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ List<KeyValue> config = new ArrayList<KeyValue>();
+ int parallelism = tdc.getTaskParallelism();
+ int id = -1;
+ Set<String> topicRouteSet = ((SinkDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+ Map<Integer, StringBuilder> taskTopicList = new HashMap<>();
+ for (String topicName : topicRouteSet) {
+ int ind = ++id % parallelism;
+ if (!taskTopicList.containsKey(ind)) {
+ taskTopicList.put(ind, new StringBuilder(topicName));
+ } else {
+ taskTopicList.get(ind).append(",").append(topicName);
+ }
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+ keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+ keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+ keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+ keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+ keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i).toString());
+ keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+ keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+
+ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc) {
+ List<KeyValue> config = new ArrayList<KeyValue>();
+ int parallelism = tdc.getTaskParallelism();
+ int id = -1;
+ Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
+ Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
+ for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
+ int ind = ++id % parallelism;
+ if (!taskTopicList.containsKey(ind)) {
+ taskTopicList.put(ind, new HashMap<>());
+ }
+ String dbKey = entry.getKey().split("-")[0];
+ String tableKey = entry.getKey().split("-")[1];
+ String filter = entry.getValue();
+ Map<String, String> tableMap = new HashMap<>();
+ tableMap.put(tableKey, filter);
+ if(!taskTopicList.get(ind).containsKey(dbKey)){
+ taskTopicList.get(ind).put(dbKey, tableMap);
+ }else {
+ taskTopicList.get(ind).get(dbKey).putAll(tableMap);
+ }
+ }
+
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+
+ keyValue.put(Config.CONN_DB_IP, tdc.getDbUrl());
+ keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
+ keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
+ keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
+ keyValue.put(Config.CONN_DB_DATACENTER, tdc.getLocalDataCenter());
+ keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
+ keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
+ keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
+ keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..70a6773
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/cassandra/strategy/TaskDivideStrategy.java
@@ -0,0 +1,32 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.cassandra.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import org.apache.rocketmq.connect.cassandra.config.DbConnectorConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskDivideConfig;
+import org.apache.rocketmq.connect.cassandra.config.TaskTopicInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+ public abstract List<KeyValue> divide(DbConnectorConfig dbConnectorConfig, TaskDivideConfig tdc);
+}