You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:19 UTC
[rocketmq-connect] 11/43: Add JdbcSourceTask and Schema
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 8ecde453ec4680fd8b37538bd7bd6312cce7fc00
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Fri Aug 2 00:20:44 2019 +0800
Add JdbcSourceTask and Schema
---
README.md | 113 ++++++++++++++++++++-
lib/mysql-connector-java-8.0.11.jar | Bin 0 -> 2036609 bytes
pom.xml | 39 ++++++-
.../org/apache/rocketmq/connect/jdbc/Config.java | 13 +--
.../jdbc/connector/JdbcSourceConnector.java | 9 +-
.../connect/jdbc/connector/JdbcSourceTask.java | 16 +--
.../rocketmq/connect/jdbc/schema/Database.java | 2 -
.../rocketmq/connect/jdbc/source/Querier.java | 8 +-
.../rocketmq/connect/jdbc/ReplicatorTest.java | 74 ++++++++++++++
.../jdbc/connector/JdbcSourceConnectorTest.java | 6 +-
10 files changed, 254 insertions(+), 26 deletions(-)
diff --git a/README.md b/README.md
index f02d838..96da884 100644
--- a/README.md
+++ b/README.md
@@ -60,9 +60,9 @@
- For example
```javascript
-SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null',
-schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING},
-Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]}
+ SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null',
+ schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING},
+ Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]}
```
#### Mentioned DataBase Information and all SourceDataEntry
@@ -73,3 +73,110 @@ Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}
![sourcedataentry.png](https://github.com/yuchenlichuck/picture/blob/master/sourcedataentry.png?raw=true)
+**启动Connector**
+
+[http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms](http://127.0.0.1:8081/connectors/connector-name?config=%7B%22connector-class%22:%22org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector%22,%22oms-driver-url%22:%22oms): rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0. [...]
+
+**查看Connector运行状态**
+
+<http://127.0.0.1:8081/connectors/connector-name/status>
+
+**查看Connector配置**
+
+<http://127.0.0.1:8081/connectors/connector-name/config>
+
+**关闭Connector**
+
+<http://127.0.0.1:8081/connectors/connector-name/stop>
+
+
+
+
+
+
+
+# JDBC Connector 构建
+
+![dataflow](https://github.com/openmessaging/openmessaging-connect/raw/master/flow.png)
+
+#### 一、下载rocketmq-connect-runtime
+
+```
+1、git clone https://github.com/apache/rocketmq-externals.git
+
+2、cd rocketmq-externals/rocketmq-connect-runtime
+
+3、mvn -Dmaven.test.skip=true package
+
+4、cd target/distribution/conf
+```
+
+- a、修改connect.conf配置文件
+
+```
+#1、rocketmq 配置
+namesrvAddr=127.0.0.1:9876
+
+#2、file-connect jar包路径
+pluginPaths=/home/connect/file-connect/target
+
+#3、runtime持久化文件目录
+storePathRootDir=/home/connect/storeRoot
+
+#4、http服务端口
+httpPort=8081
+```
+
+
+
+
+
+- b、日志相关配置在logback.xml中修改
+
+```
+注:rocketmq需要先创建cluster-topic,config-topic,offset-topic,position-topic
+4个topic,并且为了保证消息有序,每个topic可以只一个queue
+```
+
+### 二、启动Connector
+
+1、启动runtime
+回到rocketmq-externals/rocketmq-connect-runtime目录
+
+```
+./run_worker.sh
+```
+
+看到日志目录查看connect_runtime.log
+
+如果看到以下日志说明runttiime启动成功了
+
+2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
+2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
+
+2、启动sourceConnector
+
+ 正在做测试(To be continued)已实现Bulk Mode
+
+cd target/distribution/
+
+java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
+
+
+
+在http中输入Get 请求
+
+
+
+示例
+
+[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...]
+
+
+
+
+
+
+
+
+
diff --git a/lib/mysql-connector-java-8.0.11.jar b/lib/mysql-connector-java-8.0.11.jar
new file mode 100644
index 0000000..c8b8b3f
Binary files /dev/null and b/lib/mysql-connector-java-8.0.11.jar differ
diff --git a/pom.xml b/pom.xml
index dee7710..830f9ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,29 @@
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.jdbc.jdbcSourceConnector</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -159,6 +182,11 @@
<artifactId>openmessaging-connector</artifactId>
<version>0.1.0-beta</version>
</dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>0.3.1-alpha</version>
+ </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
@@ -194,7 +222,16 @@
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
-
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>8.0.11</version>
+ </dependency>
+ <dependency>
+ <groupId>io.javalin</groupId>
+ <artifactId>javalin</artifactId>
+ <version>1.3.0</version>
+ </dependency>
</dependencies>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 69ff9b0..4f7456b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -61,19 +61,20 @@ public class Config {
public String dbTimezone="UTC";
public String queueName;
+ private Logger log = LoggerFactory.getLogger(Config.class);
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
- add("jdbcUrl");
- add("jdbcUsername");
- add("jdbcPassword");
- add("mode");
- add("rocketmqTopic");
+ // add("jdbcUrl");
+ // add("jdbcUsername");
+ // add("jdbcPassword");
+ // add("mode");
+ // add("rocketmqTopic");
}
};
public void load(KeyValue props) {
-
+ log.info("Config.load.start");
properties2Object(props, this);
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 8a6047c..8c30a62 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -26,7 +26,7 @@ import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
import org.apache.rocketmq.connect.jdbc.Config;
-//import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,13 +36,16 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public String verifyAndSetConfig(KeyValue config) {
- log.info("JdbcSourceConnector verifyAndSetConfig enter");
+
+ log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
for (String requestKey : Config.REQUEST_CONFIG) {
+
if (!config.containsKey(requestKey)) {
return "Request config key: " + requestKey;
}
}
this.config = config;
+
return "";
}
@@ -71,9 +74,9 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public List<KeyValue> taskConfigs() {
+ log.info("List.start");
List<KeyValue> config = new ArrayList<>();
config.add(this.config);
return config;
}
-
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 32ea763..78f1809 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.rocketmq.connect.jdbc.Config;
-import org.apache.rocketmq.connect.jdbc.Replicator;
import org.apache.rocketmq.connect.jdbc.schema.Table;
import org.apache.rocketmq.connect.jdbc.source.Querier;
import org.apache.rocketmq.connect.jdbc.schema.column.*;
@@ -47,8 +46,6 @@ public class JdbcSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
- private Replicator replicator;
-
private Config config;
private List<Table> list=new LinkedList<>();
@@ -58,15 +55,20 @@ public class JdbcSourceTask extends SourceTask {
public Collection<SourceDataEntry> poll() {
List<SourceDataEntry> res = new ArrayList<>();
try {
+
JSONObject jsonObject = new JSONObject();
jsonObject.put("nextQuery", "database");
jsonObject.put("nextPosition", "10");
//To be Continued
+ log.info("querier.poll");
querier.poll();
- System.out.println(querier.getList().size());
+ log.info("1216connector.start");
+ int mm=0;
for(Table dataRow : querier.getList()){
System.out.println(dataRow.getColList().get(0));
- Schema schema = new Schema();
+ log.info("xunhuankaishi");
+ log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
+ Schema schema = new Schema();
schema.setDataSource(dataRow.getDatabase());
schema.setName(dataRow.getName());
schema.setFields(new ArrayList<>());
@@ -102,7 +104,9 @@ public class JdbcSourceTask extends SourceTask {
try {
this.config = new Config();
this.config.load(props);
+ log.info("querier.start");
querier.start();
+
} catch (Exception e) {
log.error("JDBC task start failed.", e);
}
@@ -110,7 +114,7 @@ public class JdbcSourceTask extends SourceTask {
@Override
public void stop() {
- replicator.stop();
+ querier.stop();
}
@Override public void pause() {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index 15fb77b..b88661d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -87,8 +87,6 @@ public class Database {
private void addTable(String tableName) {
- // LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
-
Table table = new Table(name, tableName);
tableMap.put(tableName, table);
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index e99da74..1f630ea 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -65,7 +65,7 @@ public class Querier {
return connection;
}
- public void close() {
+ public void stop() {
Connection conn;
while ((conn = connections.poll()) != null) {
try {
@@ -114,7 +114,6 @@ public class Querier {
private Schema schema;
- private Map<Long, Table> tableMap = new HashMap<>();
public void poll() {
try {
@@ -125,6 +124,8 @@ public class Querier {
for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
String db = entry.getKey();
+ if(!db.contains("jdbc_db"))
+ continue;
Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Table> tableEntry = iterator.next();
@@ -163,6 +164,7 @@ public class Querier {
initDataSource();
schema = new Schema(dataSource);
schema.load();
+ log.info("schema load successful");
}
private void initDataSource() throws Exception {
@@ -179,7 +181,9 @@ public class Querier {
map.put("minEvictableIdleTimeMillis", "300000");
map.put("validationQuery", "SELECT 1 FROM DUAL");
map.put("testWhileIdle", "true");
+ log.info("{},config read successful",map);
dataSource = DruidDataSourceFactory.createDataSource(map);
+
}
}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java
new file mode 100644
index 0000000..88d5586
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java
@@ -0,0 +1,74 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.rocketmq.connect.jms;
+//
+//import java.lang.reflect.Field;
+//
+//import javax.jms.Message;
+//
+//import org.apache.activemq.command.ActiveMQTextMessage;
+//import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+//import org.junit.Before;
+//import org.junit.Test;
+//import org.mockito.Mockito;
+//
+//import org.junit.Assert;
+//
+//public class ReplicatorTest {
+//
+// Replicator replicator;
+//
+// PatternProcessor patternProcessor;
+//
+// Config config;
+//
+// @Before
+// public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+// config = new Config();
+// replicator = new Replicator(config,null);
+//
+// patternProcessor = Mockito.mock(PatternProcessor.class);
+//
+// Field processor = Replicator.class.getDeclaredField("processor");
+// processor.setAccessible(true);
+// processor.set(replicator, patternProcessor);
+// }
+//
+// @Test(expected = RuntimeException.class)
+// public void startTest() throws Exception {
+// replicator.start();
+// }
+//
+// @Test
+// public void stop() throws Exception {
+// replicator.stop();
+// Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+// }
+//
+// @Test
+// public void commitAddGetQueueTest() {
+// Message message = new ActiveMQTextMessage();
+// replicator.commit(message, false);
+// Assert.assertEquals(replicator.getQueue().poll(), message);
+// }
+//
+// @Test
+// public void getConfigTest() {
+// Assert.assertEquals(replicator.getConfig(), config);
+// }
+//}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
index 97d87ee..297d517 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
@@ -38,8 +38,6 @@ public class JdbcSourceConnectorTest {
add("jdbcUrl");
add("jdbcUsername");
add("jdbcPassword");
- add("mode");
- add("rocketmqTopic");
}
};
@@ -52,7 +50,9 @@ public class JdbcSourceConnectorTest {
}
-
+// Set<String> getRequiredConfig() {
+// return REQUEST_CONFIG;
+// }
};
@Test