You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:17 UTC
[rocketmq-connect] 09/43: Add JdbcSourceTask and Schema
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit e915f678c601912fec4b1f7438972f740ca5288b
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Mon Jul 29 21:49:58 2019 +0800
Add JdbcSourceTask and Schema
---
pom.xml | 12 +++
.../apache/rocketmq/connect/jdbc/Replicator.java | 118 ---------------------
.../rocketmq/connect/jdbc/source/Querier.java | 2 -
.../jdbc/connector/JdbcSourceConnectorTest.java | 9 +-
.../connect/jdbc/connector/JdbcSourceTaskTest.java | 44 ++++++++
5 files changed, 60 insertions(+), 125 deletions(-)
diff --git a/pom.xml b/pom.xml
index a53b8b0..dee7710 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.12</version>
+ </dependency>
+ <dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.0-beta</version>
@@ -179,11 +184,18 @@
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid</artifactId>
+ <version>1.0.18</version>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
+
+
</dependencies>
</project>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
deleted file mode 100644
index b24b7e5..0000000
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.jdbc;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Replicator {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
-
- private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
-
- private Config config;
-
- private EventProcessor eventProcessor;
-
- private Object lock = new Object();
- private BinlogPosition nextBinlogPosition;
- private long nextQueueOffset;
- private long xid;
- private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>();
-
- public Replicator(Config config){
- this.config = config;
- }
-
- public void start() {
-
- try {
-
- eventProcessor = new EventProcessor(this);
- eventProcessor.start();
-
- } catch (Exception e) {
- LOGGER.error("Start error.", e);
- }
- }
-
- public void stop(){
- eventProcessor.stop();
- }
-
- public void commit(Transaction transaction, boolean isComplete) {
-
- queue.add(transaction);
- for (int i = 0; i < 3; i++) {
- try {
- if (isComplete) {
- long offset = 1;
- synchronized (lock) {
- xid = transaction.getXid();
- nextBinlogPosition = transaction.getNextBinlogPosition();
- nextQueueOffset = offset;
- }
-
- } else {
- }
- break;
-
- } catch (Exception e) {
- LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
- }
- }
- }
-
- public void logPosition() {
-
- String binlogFilename = null;
- long xid = 0L;
- long nextPosition = 0L;
- long nextOffset = 0L;
-
- synchronized (lock) {
- if (nextBinlogPosition != null) {
- xid = this.xid;
- binlogFilename = nextBinlogPosition.getBinlogFilename();
- nextPosition = nextBinlogPosition.getPosition();
- nextOffset = nextQueueOffset;
- }
- }
-
- if (binlogFilename != null) {
- POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",
- xid, binlogFilename, nextPosition, nextOffset);
- }
-
- }
-
- public Config getConfig() {
- return config;
- }
-
-// public BinlogPosition getNextBinlogPosition() {
-// return nextBinlogPosition;
-// }
-
- public BlockingQueue<Transaction> getQueue() {
- return queue;
- }
-}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 61323d4..e99da74 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -125,8 +125,6 @@ public class Querier {
for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
String db = entry.getKey();
- if(!db.contains("jdbc_db"))
- continue;
Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Table> tableEntry = iterator.next();
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
index 79e4e59..97d87ee 100644
--- a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnectorTest.java
@@ -35,10 +35,11 @@ public class JdbcSourceConnectorTest {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
- add("jdbcAddr");
- add("jdbcPort");
+ add("jdbcUrl");
add("jdbcUsername");
add("jdbcPassword");
+ add("mode");
+ add("rocketmqTopic");
}
};
@@ -51,9 +52,7 @@ public class JdbcSourceConnectorTest {
}
-// Set<String> getRequiredConfig() {
-// return REQUEST_CONFIG;
-// }
+
};
@Test
diff --git a/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
new file mode 100644
index 0000000..f9c8c6f
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTaskTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.connector;
+import java.util.Collection;
+import org.junit.Test;
+
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.internal.DefaultKeyValue;
+
+public class JdbcSourceTaskTest {
+
+
+ @Test
+ public void test() throws InterruptedException {
+ KeyValue kv = new DefaultKeyValue();
+ kv.put("jdbcUrl","localhost:3306");
+ kv.put("jdbcUsername","root");
+ kv.put("jdbcPassword","199812160");
+ kv.put("mode","bulk");
+ kv.put("rocketmqTopic","JdbcTopic");
+ JdbcSourceTask task = new JdbcSourceTask();
+ task.start(kv);
+ Collection<SourceDataEntry> sourceDataEntry = task.poll();
+ System.out.println(sourceDataEntry);
+
+ }
+}