You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:17 UTC

[rocketmq-connect] 09/43: Add JdbcSourceTask and Schema

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e915f678c601912fec4b1f7438972f740ca5288b
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Mon Jul 29 21:49:58 2019 +0800

    Add JdbcSourceTask and Schema
---
 pom.xml                                            |  12 +++
 .../apache/rocketmq/connect/jdbc/Replicator.java   | 118 ---------------------
 .../rocketmq/connect/jdbc/source/Querier.java      |   2 -
 .../jdbc/connector/JdbcSourceConnectorTest.java    |   9 +-
 .../connect/jdbc/connector/JdbcSourceTaskTest.java |  44 ++++++++
 5 files changed, 60 insertions(+), 125 deletions(-)

diff --git a/pom.xml b/pom.xml
index a53b8b0..dee7710 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+    		<groupId>commons-codec</groupId>
+    		<artifactId>commons-codec</artifactId>
+    		<version>1.12</version>
+		</dependency>
+        <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
             <version>0.1.0-beta</version>
@@ -179,11 +184,18 @@
             <artifactId>rocketmq-openmessaging</artifactId>
             <version>4.3.2</version>
         </dependency>
+       <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.18</version>
+        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <version>1.2</version>
         </dependency>
+
+
     </dependencies>
 
 </project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
deleted file mode 100644
index b24b7e5..0000000
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jdbc;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Replicator {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
-
-    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
-
-    private Config config;
-
-    private EventProcessor eventProcessor;
-
-    private Object lock = new Object();
-    private BinlogPosition nextBinlogPosition;
-    private long nextQueueOffset;
-    private long xid;
-    private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>();
-
-    public Replicator(Config config){
-        this.config = config;
-    }
-
-    public void start() {
-
-        try {
-
-            eventProcessor = new EventProcessor(this);
-            eventProcessor.start();
-
-        } catch (Exception e) {
-            LOGGER.error("Start error.", e);
-        }
-    }
-
-    public void stop(){
-        eventProcessor.stop();
-    }
-
-    public void commit(Transaction transaction, boolean isComplete) {
-
-        queue.add(transaction);
-        for (int i = 0; i < 3; i++) {
-            try {
-                if (isComplete) {
-                    long offset = 1;
-                    synchronized (lock) {
-                        xid = transaction.getXid();
-                        nextBinlogPosition = transaction.getNextBinlogPosition();
-                        nextQueueOffset = offset;
-                    }
-
-                } else {
-                }
-                break;
-
-            } catch (Exception e) {
-                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
-            }
-        }
-    }
-
-    public void logPosition() {
-
-        String binlogFilename = null;
-        long xid = 0L;
-        long nextPosition = 0L;
-        long nextOffset = 0L;
-
-        synchronized (lock) {
-            if (nextBinlogPosition != null) {
-                xid = this.xid;
-                binlogFilename = nextBinlogPosition.getBinlogFilename();
-                nextPosition = nextBinlogPosition.getPosition();
-                nextOffset = nextQueueOffset;
-            }
-        }
-
-        if (binlogFilename != null) {
-            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
-                xid, binlogFilename, nextPosition, nextOffset);
-        }
-
-    }
-
-    public Config getConfig() {
-        return config;
-    }
-
-//    public BinlogPosition getNextBinlogPosition() {
-//        return nextBinlogPosition;
-//    }
-
-    public BlockingQueue<Transaction> getQueue() {
-        return queue;
-    }
-}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 61323d4..e99da74 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -125,8 +125,6 @@ public class Querier {
 
 			for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
 				String db = entry.getKey();
-				if(!db.contains("jdbc_db"))
-					continue;
 				Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
 				while (iterator.hasNext()) {
 					Map.Entry<String, Table> tableEntry = iterator.next();
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
index 79e4e59..97d87ee 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
@@ -35,10 +35,11 @@ public class JdbcSourceConnectorTest {
 
 	public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
-            add("jdbcAddr");
-            add("jdbcPort");
+            add("jdbcUrl");
             add("jdbcUsername");
             add("jdbcPassword");
+            add("mode");
+            add("rocketmqTopic");
         }
     };
 	
@@ -51,9 +52,7 @@ public class JdbcSourceConnectorTest {
 		}
 
 
-//		Set<String> getRequiredConfig() {
-//			return REQUEST_CONFIG;
-//		}
+
 	};
 
     @Test
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
new file mode 100644
index 0000000..f9c8c6f
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.connector;
+import java.util.Collection;
+import org.junit.Test;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class JdbcSourceTaskTest {
+
+
+    @Test
+    public void test() throws InterruptedException {
+        KeyValue kv = new DefaultKeyValue();
+        kv.put("jdbcUrl","localhost:3306");
+        kv.put("jdbcUsername","root");
+        kv.put("jdbcPassword","199812160");
+        kv.put("mode","bulk");
+        kv.put("rocketmqTopic","JdbcTopic");
+        JdbcSourceTask task = new JdbcSourceTask();
+        task.start(kv);
+            Collection<SourceDataEntry> sourceDataEntry = task.poll();
+            System.out.println(sourceDataEntry);
+
+    }
+}