You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:19 UTC

[rocketmq-connect] 11/43: Add JdbcSourceTask and Schema

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 8ecde453ec4680fd8b37538bd7bd6312cce7fc00
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Fri Aug 2 00:20:44 2019 +0800

    Add JdbcSourceTask and Schema
---
 README.md                                          | 113 ++++++++++++++++++++-
 lib/mysql-connector-java-8.0.11.jar                | Bin 0 -> 2036609 bytes
 pom.xml                                            |  39 ++++++-
 .../org/apache/rocketmq/connect/jdbc/Config.java   |  13 +--
 .../jdbc/connector/JdbcSourceConnector.java        |   9 +-
 .../connect/jdbc/connector/JdbcSourceTask.java     |  16 +--
 .../rocketmq/connect/jdbc/schema/Database.java     |   2 -
 .../rocketmq/connect/jdbc/source/Querier.java      |   8 +-
 .../rocketmq/connect/jdbc/ReplicatorTest.java      |  74 ++++++++++++++
 .../jdbc/connector/JdbcSourceConnectorTest.java    |   6 +-
 10 files changed, 254 insertions(+), 26 deletions(-)

diff --git a/README.md b/README.md
index f02d838..96da884 100644
--- a/README.md
+++ b/README.md
@@ -60,9 +60,9 @@
 - For example
 
 ```javascript
-SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null', 
-schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING}, 
-Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]}
+    SourceDataEntry{sourcePartition=java.nio.HeapByteBuffer[pos=0 lim=14 cap=14], sourcePosition=java.nio.HeapByteBuffer[pos=0 lim=44 cap=44]} DataEntry{timestamp=1564397062419, entryType=CREATE, queueName='student', shardingKey='null', 
+    schema=Schema{dataSource='jdbc_db', name='student', fields=[Field{index=0, name='id', type=INT32}, Field{index=1, name='first', type=STRING}, 
+    Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}]}, payload=[102121, "Python", "Py", 25]}
 ```
 
 #### Mentioned DataBase Information and all SourceDataEntry
@@ -73,3 +73,110 @@ Field{index=2, name='last', type=STRING}, Field{index=3, name='age', type=INT32}
 
 ![sourcedataentry.png](https://github.com/yuchenlichuck/picture/blob/master/sourcedataentry.png?raw=true)
 
+**启动Connector**
+
+[http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms](http://127.0.0.1:8081/connectors/connector-name?config=%7B%22connector-class%22:%22org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector%22,%22oms-driver-url%22:%22oms): rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0. [...]
+
+**查看Connector运行状态**
+
+<http://127.0.0.1:8081/connectors/connector-name/status>
+
+**查看Connector配置**
+
+<http://127.0.0.1:8081/connectors/connector-name/config>
+
+**关闭Connector**
+
+<http://127.0.0.1:8081/connectors/connector-name/stop>
+
+
+
+
+
+
+
+# JDBC Connector 构建
+
+![dataflow](https://github.com/openmessaging/openmessaging-connect/raw/master/flow.png)
+
+#### 一、下载rocketmq-connect-runtime
+
+```
+1、git clone https://github.com/apache/rocketmq-externals.git
+
+2、cd rocketmq-externals/rocketmq-connect-runtime
+
+3、mvn -Dmaven.test.skip=true package
+
+4、cd target/distribution/conf
+```
+
+- a、修改connect.conf配置文件
+
+```
+#1、rocketmq 配置
+namesrvAddr=127.0.0.1:9876
+   
+#2、file-connect jar包路径
+pluginPaths=/home/connect/file-connect/target
+   
+#3、runtime持久化文件目录
+storePathRootDir=/home/connect/storeRoot
+   
+#4、http服务端口
+httpPort=8081
+```
+
+   
+
+
+
+- b、日志相关配置在logback.xml中修改
+
+```
+注:rocketmq需要先创建cluster-topic,config-topic,offset-topic,position-topic
+4个topic,并且为了保证消息有序,每个topic可以只一个queue
+```
+
+### 二、启动Connector
+
+1、启动runtime
+回到rocketmq-externals/rocketmq-connect-runtime目录
+
+```
+./run_worker.sh
+```
+
+看到日志目录查看connect_runtime.log
+
+如果看到以下日志说明runttiime启动成功了
+
+2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
+2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
+
+2、启动sourceConnector
+
+​	正在做测试(To be continued)已实现Bulk Mode
+
+cd target/distribution/
+
+java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
+
+
+
+在http中输入Get 请求
+
+
+
+示例
+
+[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...]
+
+
+
+
+
+
+
+
+
diff --git a/lib/mysql-connector-java-8.0.11.jar b/lib/mysql-connector-java-8.0.11.jar
new file mode 100644
index 0000000..c8b8b3f
Binary files /dev/null and b/lib/mysql-connector-java-8.0.11.jar differ
diff --git a/pom.xml b/pom.xml
index dee7710..830f9ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,29 @@
                 <artifactId>findbugs-maven-plugin</artifactId>
                 <version>3.0.4</version>
             </plugin>
+			 <plugin>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <version>3.0.0</version>
+            <configuration>
+                <archive>
+                    <manifest>
+                        <mainClass>org.apache.rocketmq.connect.jdbc.jdbcSourceConnector</mainClass>
+                    </manifest>
+                </archive>
+                <descriptorRefs>
+                    <descriptorRef>jar-with-dependencies</descriptorRef>
+                </descriptorRefs>
+            </configuration>
+            <executions>
+                <execution>
+                    <id>make-assembly</id>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>single</goal>
+                    </goals>
+                </execution>
+            </executions>
+			</plugin>
         </plugins>
     </build>
 
@@ -159,6 +182,11 @@
             <artifactId>openmessaging-connector</artifactId>
             <version>0.1.0-beta</version>
         </dependency>
+		<dependency>
+			<groupId>io.openmessaging</groupId>
+			<artifactId>openmessaging-api</artifactId>
+			<version>0.3.1-alpha</version>
+		</dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
@@ -194,7 +222,16 @@
             <artifactId>commons-cli</artifactId>
             <version>1.2</version>
         </dependency>
-
+		<dependency>
+			<groupId>mysql</groupId>
+			<artifactId>mysql-connector-java</artifactId>
+			<version>8.0.11</version>
+		</dependency>
+		<dependency>
+			<groupId>io.javalin</groupId>
+			<artifactId>javalin</artifactId>
+			<version>1.3.0</version>
+		</dependency>		
 
     </dependencies>
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 69ff9b0..4f7456b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -61,19 +61,20 @@ public class Config {
     public String dbTimezone="UTC";
     public String queueName;
 
+    private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
-            add("jdbcUrl");
-            add("jdbcUsername");
-            add("jdbcPassword");
-            add("mode");
-            add("rocketmqTopic");
+          //  add("jdbcUrl");
+          //  add("jdbcUsername");
+         //   add("jdbcPassword");
+        //    add("mode");
+        //    add("rocketmqTopic");
         }
     };
 
 
     public void load(KeyValue props) {
-
+			log.info("Config.load.start");
         properties2Object(props, this);
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 8a6047c..8c30a62 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -26,7 +26,7 @@ import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
 
 import org.apache.rocketmq.connect.jdbc.Config;
-//import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +36,16 @@ public class JdbcSourceConnector extends SourceConnector {
 
     @Override
     public String verifyAndSetConfig(KeyValue config) {
-        log.info("JdbcSourceConnector verifyAndSetConfig enter");
+
+        log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
         for (String requestKey : Config.REQUEST_CONFIG) {
+
             if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
         }
         this.config = config;
+	
         return "";
     }
 
@@ -71,9 +74,9 @@ public class JdbcSourceConnector extends SourceConnector {
 
     @Override
     public List<KeyValue> taskConfigs() {
+					log.info("List.start");
         List<KeyValue> config = new ArrayList<>();
         config.add(this.config);
         return config;
     }
-
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 32ea763..78f1809 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.rocketmq.connect.jdbc.Config;
-import org.apache.rocketmq.connect.jdbc.Replicator;
 import org.apache.rocketmq.connect.jdbc.schema.Table;
 import org.apache.rocketmq.connect.jdbc.source.Querier;
 import org.apache.rocketmq.connect.jdbc.schema.column.*;
@@ -47,8 +46,6 @@ public class JdbcSourceTask extends SourceTask {
 
     private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
 
-    private Replicator replicator;
-
     private Config config;
     
 	private List<Table> list=new LinkedList<>();
@@ -58,15 +55,20 @@ public class JdbcSourceTask extends SourceTask {
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
         try {
+            
             JSONObject jsonObject = new JSONObject();
             jsonObject.put("nextQuery", "database");
             jsonObject.put("nextPosition", "10");
         	//To be Continued
+			log.info("querier.poll");
             querier.poll();
-    		System.out.println(querier.getList().size());
+									log.info("1216connector.start");
+			int mm=0;
             for(Table dataRow : querier.getList()){
             	System.out.println(dataRow.getColList().get(0));
-                Schema schema = new Schema();
+										log.info("xunhuankaishi");
+                log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
+				Schema schema = new Schema();
                 schema.setDataSource(dataRow.getDatabase());
                 schema.setName(dataRow.getName());
                 schema.setFields(new ArrayList<>());
@@ -102,7 +104,9 @@ public class JdbcSourceTask extends SourceTask {
         try {
             this.config = new Config();
             this.config.load(props);
+						log.info("querier.start");
     		querier.start();
+
         } catch (Exception e) {
             log.error("JDBC task start failed.", e);
         }
@@ -110,7 +114,7 @@ public class JdbcSourceTask extends SourceTask {
 
     @Override
     public void stop() {
-        replicator.stop();
+        querier.stop();
     }
 
     @Override public void pause() {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index 15fb77b..b88661d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -87,8 +87,6 @@ public class Database {
 
     private void addTable(String tableName) {
 
-   //     LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
-
         Table table = new Table(name, tableName);
         tableMap.put(tableName, table);
     }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index e99da74..1f630ea 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -65,7 +65,7 @@ public class Querier {
 		return connection;
 	}
 
-	public void close() {
+	public void stop() {
 		Connection conn;
 		while ((conn = connections.poll()) != null) {
 			try {
@@ -114,7 +114,6 @@ public class Querier {
 
 	private Schema schema;
 
-	private Map<Long, Table> tableMap = new HashMap<>();
 
 	public void poll() {
 		try {
@@ -125,6 +124,8 @@ public class Querier {
 
 			for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
 				String db = entry.getKey();
+								if(!db.contains("jdbc_db"))
+					continue;
 				Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
 				while (iterator.hasNext()) {
 					Map.Entry<String, Table> tableEntry = iterator.next();
@@ -163,6 +164,7 @@ public class Querier {
 		initDataSource();
 		schema = new Schema(dataSource);
 		schema.load();
+		log.info("schema load successful");
 	}
 
 	private void initDataSource() throws Exception {
@@ -179,7 +181,9 @@ public class Querier {
 		map.put("minEvictableIdleTimeMillis", "300000");
 		map.put("validationQuery", "SELECT 1 FROM DUAL");
 		map.put("testWhileIdle", "true");
+		log.info("{},config read successful",map);
 		dataSource = DruidDataSourceFactory.createDataSource(map);
+
 	}
 
 }
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java
new file mode 100644
index 0000000..88d5586
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/ReplicatorTest.java
@@ -0,0 +1,74 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *     http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.rocketmq.connect.jms;
+//
+//import java.lang.reflect.Field;
+//
+//import javax.jms.Message;
+//
+//import org.apache.activemq.command.ActiveMQTextMessage;
+//import org.apache.rocketmq.connect.jms.pattern.PatternProcessor;
+//import org.junit.Before;
+//import org.junit.Test;
+//import org.mockito.Mockito;
+//
+//import org.junit.Assert;
+//
+//public class ReplicatorTest {
+//
+//    Replicator replicator;
+//
+//    PatternProcessor patternProcessor;
+//
+//    Config config;
+//
+//    @Before
+//    public void before() throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+//        config = new Config();
+//        replicator = new Replicator(config,null);
+//
+//        patternProcessor = Mockito.mock(PatternProcessor.class);
+//
+//        Field processor = Replicator.class.getDeclaredField("processor");
+//        processor.setAccessible(true);
+//        processor.set(replicator, patternProcessor);
+//    }
+//
+//    @Test(expected = RuntimeException.class)
+//    public void startTest() throws Exception {
+//        replicator.start();
+//    }
+//
+//    @Test
+//    public void stop() throws Exception {
+//        replicator.stop();
+//        Mockito.verify(patternProcessor, Mockito.times(1)).stop();
+//    }
+//
+//    @Test
+//    public void commitAddGetQueueTest() {
+//        Message message = new ActiveMQTextMessage();
+//        replicator.commit(message, false);
+//        Assert.assertEquals(replicator.getQueue().poll(), message);
+//    }
+//
+//    @Test
+//    public void getConfigTest() {
+//        Assert.assertEquals(replicator.getConfig(), config);
+//    }
+//}
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
index 97d87ee..297d517 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
@@ -38,8 +38,6 @@ public class JdbcSourceConnectorTest {
             add("jdbcUrl");
             add("jdbcUsername");
             add("jdbcPassword");
-            add("mode");
-            add("rocketmqTopic");
         }
     };
 	
@@ -52,7 +50,9 @@ public class JdbcSourceConnectorTest {
 		}
 
 
-
+//		Set<String> getRequiredConfig() {
+//			return REQUEST_CONFIG;
+//		}
 	};
 
     @Test