You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:16 UTC
[rocketmq-connect] 08/43: Add JdbcSourceTask
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 5a83890b55770820325d32aa7c86df6b477c37b7
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Mon Jul 29 21:20:48 2019 +0800
Add JdbcSourceTask
---
.../org/apache/rocketmq/connect/jdbc/Config.java | 12 +-
.../apache/rocketmq/connect/jdbc/Replicator.java | 118 +++++++++++++
.../connect/jdbc/connector/JdbcSourceTask.java | 107 +++++++++++-
.../rocketmq/connect/jdbc/schema/Database.java | 100 +++++++++++
.../rocketmq/connect/jdbc/schema/Schema.java | 128 ++++++++++++++
.../apache/rocketmq/connect/jdbc/schema/Table.java | 90 ++++++++++
.../column/BigIntColumnParser.java} | 40 ++++-
.../connect/jdbc/schema/column/ColumnParser.java | 104 ++++++++++++
.../jdbc/schema/column/DateTimeColumnParser.java | 53 ++++++
.../column/DefaultColumnParser.java} | 27 ++-
.../column/EnumColumnParser.java} | 36 +++-
.../jdbc/schema/column/IntColumnParser.java | 66 ++++++++
.../jdbc/schema/column/SetColumnParser.java | 54 ++++++
.../jdbc/schema/column/StringColumnParser.java | 57 +++++++
.../column/TimeColumnParser.java} | 29 +++-
.../column/YearColumnParser.java} | 30 +++-
.../rocketmq/connect/jdbc/source/Querier.java | 187 +++++++++++++++++++++
17 files changed, 1181 insertions(+), 57 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 217b449..69ff9b0 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -31,9 +31,9 @@ public class Config {
private static final Logger LOG = LoggerFactory.getLogger(Config.class);
/* Database Connection Config */
- public String jdbcUrl;
- public String jdbcUsername;
- public String jdbcPassword;
+ public String jdbcUrl="localhost:3306";
+ public String jdbcUsername="root";
+ public String jdbcPassword="199812160";
public String rocketmqTopic;
public String jdbcBackoff;
public String jdbcAttempts;
@@ -54,7 +54,7 @@ public class Config {
/*Connector config*/
public String tableTypes="table";
- public int pollInterval=5000;
+ public long pollInterval=5000;
public int batchMaxRows=100;
public long tablePollInterval=60000;
public long timestampDelayInterval=0;
@@ -67,7 +67,7 @@ public class Config {
add("jdbcUsername");
add("jdbcPassword");
add("mode");
- add("queueName");
+ add("rocketmqTopic");
}
};
@@ -278,7 +278,7 @@ public class Config {
this.tableTypes = tableTypes;
}
- public int getPollInterval() {
+ public long getPollInterval() {
return pollInterval;
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
new file mode 100644
index 0000000..b24b7e5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+ private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
+
+ private Config config;
+
+ private EventProcessor eventProcessor;
+
+ private Object lock = new Object();
+ private BinlogPosition nextBinlogPosition;
+ private long nextQueueOffset;
+ private long xid;
+ private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>();
+
+ public Replicator(Config config){
+ this.config = config;
+ }
+
+ public void start() {
+
+ try {
+
+ eventProcessor = new EventProcessor(this);
+ eventProcessor.start();
+
+ } catch (Exception e) {
+ LOGGER.error("Start error.", e);
+ }
+ }
+
+ public void stop(){
+ eventProcessor.stop();
+ }
+
+ public void commit(Transaction transaction, boolean isComplete) {
+
+ queue.add(transaction);
+ for (int i = 0; i < 3; i++) {
+ try {
+ if (isComplete) {
+ long offset = 1;
+ synchronized (lock) {
+ xid = transaction.getXid();
+ nextBinlogPosition = transaction.getNextBinlogPosition();
+ nextQueueOffset = offset;
+ }
+
+ } else {
+ }
+ break;
+
+ } catch (Exception e) {
+ LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+ }
+ }
+ }
+
+ public void logPosition() {
+
+ String binlogFilename = null;
+ long xid = 0L;
+ long nextPosition = 0L;
+ long nextOffset = 0L;
+
+ synchronized (lock) {
+ if (nextBinlogPosition != null) {
+ xid = this.xid;
+ binlogFilename = nextBinlogPosition.getBinlogFilename();
+ nextPosition = nextBinlogPosition.getPosition();
+ nextOffset = nextQueueOffset;
+ }
+ }
+
+ if (binlogFilename != null) {
+ POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",
+ xid, binlogFilename, nextPosition, nextOffset);
+ }
+
+ }
+
+ public Config getConfig() {
+ return config;
+ }
+
+// public BinlogPosition getNextBinlogPosition() {
+// return nextBinlogPosition;
+// }
+
+ public BlockingQueue<Transaction> getQueue() {
+ return queue;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index adb1c62..32ea763 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -18,11 +18,106 @@
*/
package org.apache.rocketmq.connect.jdbc.connector;
-
import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.Replicator;
+import org.apache.rocketmq.connect.jdbc.schema.Table;
+import org.apache.rocketmq.connect.jdbc.source.Querier;
+import org.apache.rocketmq.connect.jdbc.schema.column.*;
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.Field;
+
+public class JdbcSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
+
+ private Replicator replicator;
+
+ private Config config;
+
+ private List<Table> list=new LinkedList<>();
+
+ Querier querier = new Querier();
+ @Override
+ public Collection<SourceDataEntry> poll() {
+ List<SourceDataEntry> res = new ArrayList<>();
+ try {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("nextQuery", "database");
+ jsonObject.put("nextPosition", "10");
+ //To be Continued
+ querier.poll();
+ System.out.println(querier.getList().size());
+ for(Table dataRow : querier.getList()){
+ System.out.println(dataRow.getColList().get(0));
+ Schema schema = new Schema();
+ schema.setDataSource(dataRow.getDatabase());
+ schema.setName(dataRow.getName());
+ schema.setFields(new ArrayList<>());
+ for(int i = 0; i < dataRow.getColList().size(); i++){
+ String columnName = dataRow.getColList().get(i);
+ String rawDataType = dataRow.getRawDataTypeList().get(i);
+ Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+ schema.getFields().add(field);
+ }
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(dataRow.getName())
+ .entryType(EntryType.CREATE);
+ for(int i = 0; i < dataRow.getColList().size(); i++){
+ Object value=dataRow.getDataList().get(i);
+ System.out.println(1);
+ System.out.println(dataRow.getColList().get(i)+"|"+value);
+ dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
+ }
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+ res.add(sourceDataEntry);
+ }
+ } catch (Exception e) {
+ log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e);
+ }
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue props) {
+ try {
+ this.config = new Config();
+ this.config.load(props);
+ querier.start();
+ } catch (Exception e) {
+ log.error("JDBC task start failed.", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ replicator.stop();
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
new file mode 100644
index 0000000..15fb77b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+//import io.openmessaging.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Database {
+ private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " +
+ "from information_schema.columns " +
+ "where table_schema = ? order by ORDINAL_POSITION";
+ private String name;
+ private DataSource dataSource;
+ public Map<String, Table> tableMap = new HashMap<String, Table>();
+ public Database(String name, DataSource dataSource) {
+ this.name = name;
+ this.dataSource = dataSource;
+ }
+
+ public void init() throws SQLException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ conn = dataSource.getConnection();
+
+ ps = conn.prepareStatement(SQL);
+ ps.setString(1, name);
+ rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String tableName = rs.getString(1);
+ String colName = rs.getString(2);
+ String dataType = rs.getString(3);
+ String colType = rs.getString(4);
+ String charset = rs.getString(5);
+
+ ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
+
+ if (!tableMap.containsKey(tableName)) {
+ addTable(tableName);
+ }
+ Table table = tableMap.get(tableName);
+ table.addCol(colName);
+ table.addParser(columnParser);
+ table.addRawDataType(dataType);
+ }
+
+ } finally {
+ if (conn != null) {
+ conn.close();
+ }
+ if (ps != null) {
+ ps.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ }
+
+ }
+
+ private void addTable(String tableName) {
+
+ // LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+ Table table = new Table(name, tableName);
+ tableMap.put(tableName, table);
+ }
+
+ public Table getTable(String tableName) {
+
+ return tableMap.get(tableName);
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
new file mode 100644
index 0000000..6ce6621
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+//import io.openmessaging.mysql.binlog.EventProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Schema {
+// private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+ private static final String SQL = "select schema_name from information_schema.schemata";
+ //取得数据库
+ private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+ Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"})
+ );
+ //忽略的数据库
+ private DataSource dataSource;
+
+ public Map<String, Database> dbMap;
+
+ public Schema(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public void load() throws SQLException {
+
+ dbMap = new HashMap<>();
+
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+
+ try {
+ conn = dataSource.getConnection();
+
+ ps = conn.prepareStatement(SQL);
+ rs = ps.executeQuery();
+
+ while (rs.next()) {
+ String dbName = rs.getString(1);
+ if (!IGNORED_DATABASES.contains(dbName)) {
+ Database database = new Database(dbName, dataSource);
+ dbMap.put(dbName, database);
+ //dbMap存着各个数据库的名字
+ }
+ }
+
+ } finally {
+
+ if (conn != null) {
+ conn.close();
+ }
+ if (ps != null) {
+ ps.close();
+ }
+ if (rs != null) {
+ rs.close();
+ }
+ }
+
+ for (Database db : dbMap.values()) {
+ db.init();
+ }
+
+ }
+
+ public Table getTable(String dbName, String tableName) {
+
+ if (dbMap == null) {
+ reload();
+ }
+
+ Database database = dbMap.get(dbName);
+ if (database == null) {
+ return null;
+ }
+
+ Table table = database.getTable(tableName);
+ if (table == null) {
+ return null;
+ }
+
+ return table;
+ }
+
+ private void reload() {
+
+ while (true) {
+ try {
+ load();
+ break;
+ } catch (Exception e) {
+ // LOGGER.error("Reload schema error.", e);
+ System.out.println("Reload schema error."+e);
+ }
+ }
+ }
+
+ public void reset() {
+ dbMap = null;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
new file mode 100644
index 0000000..c0d793d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import java.util.LinkedList;
+import java.util.List;
+
+public class Table {
+
+ private String database;
+ private String name;
+ private List<String> colList = new LinkedList<>();
+ private List<ColumnParser> parserList = new LinkedList<>();
+ private List<String> rawDataTypeList = new LinkedList<>();
+ private List<Object> dataList =new LinkedList<>();
+
+ public Table(String database, String table) {
+ this.database = database;
+ this.name = table;
+ }
+
+ public void addCol(String column) {
+ colList.add(column);
+ }
+
+ public void setParserList(List<ColumnParser> parserList) {
+ this.parserList = parserList;
+ }
+
+ public void setRawDataTypeList(List<String> rawDataTypeList) {
+ this.rawDataTypeList = rawDataTypeList;
+ }
+
+ public void addParser(ColumnParser columnParser) {
+ parserList.add(columnParser);
+ }
+
+ public void addRawDataType(String rawDataType){
+ this.rawDataTypeList.add(rawDataType);
+ }
+
+ public List<String> getColList() {
+ return colList;
+ }
+
+ public List<String> getRawDataTypeList() {
+ return rawDataTypeList;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<ColumnParser> getParserList() {
+ return parserList;
+ }
+
+ public List<Object> getDataList() {
+ return dataList;
+ }
+
+ public void setDataList(List<Object> dataList) {
+ this.dataList = dataList;
+ }
+
+ public void setColList(List<String> colList) {
+ this.colList = colList;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
similarity index 52%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
index adb1c62..610f07d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +15,36 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
-import io.openmessaging.connector.api.source.SourceTask;
+import java.math.BigInteger;
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class BigIntColumnParser extends ColumnParser {
+
+ private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+ private boolean signed;
+
+ public BigIntColumnParser(String colType) {
+ this.signed = !colType.matches(".* unsigned$");
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof BigInteger) {
+ return value;
+ }
+
+ Long l = (Long) value;
+ if (!signed && l < 0) {
+ return max.add(BigInteger.valueOf(l));
+ } else {
+ return l;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java
new file mode 100644
index 0000000..341064e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import io.openmessaging.connector.api.data.FieldType;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+
+ public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+ switch (dataType) {
+ case "tinyint":
+ case "smallint":
+ case "mediumint":
+ case "int":
+ return new IntColumnParser(dataType, colType);
+ case "bigint":
+ return new BigIntColumnParser(colType);
+ case "tinytext":
+ case "text":
+ case "mediumtext":
+ case "longtext":
+ case "varchar":
+ case "char":
+ return new StringColumnParser(charset);
+ case "date":
+ case "datetime":
+ case "timestamp":
+ return new DateTimeColumnParser();
+ case "time":
+ return new TimeColumnParser();
+ case "year":
+ return new YearColumnParser();
+ case "enum":
+ return new EnumColumnParser(colType);
+ case "set":
+ return new SetColumnParser(colType);
+ default:
+ return new DefaultColumnParser();
+ }
+ }
+
+ public static FieldType mapConnectorFieldType(String dataType) {
+
+ switch (dataType) {
+ case "tinyint":
+ case "smallint":
+ case "mediumint":
+ case "int":
+ return FieldType.INT32;
+ case "bigint":
+ return FieldType.BIG_INTEGER;
+ case "tinytext":
+ case "text":
+ case "mediumtext":
+ case "longtext":
+ case "varchar":
+ case "char":
+ return FieldType.STRING;
+ case "date":
+ case "datetime":
+ case "timestamp":
+ case "time":
+ case "year":
+ return FieldType.DATETIME;
+ case "enum":
+ return null;
+ case "set":
+ return null;
+ default:
+ return FieldType.BYTES;
+ }
+ }
+
+ public static String[] extractEnumValues(String colType) {
+ String[] enumValues = {};
+ Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+ if (matcher.matches()) {
+ enumValues = matcher.group(2).replace("'", "").split(",");
+ }
+
+ return enumValues;
+ }
+
+ public abstract Object getValue(Object value);
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..8736280
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+ private static SimpleDateFormat dateTimeFormat;
+ private static SimpleDateFormat dateTimeUtcFormat;
+
+ static {
+ dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Timestamp) {
+ return dateTimeFormat.format(value);
+ }
+
+ if (value instanceof Long) {
+ return dateTimeUtcFormat.format(new Date((Long) value));
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
similarity index 65%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
index adb1c62..ee3075a 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +15,23 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
-import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.commons.codec.binary.Base64;
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class DefaultColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof byte[]) {
+ return Base64.encodeBase64String((byte[]) value);
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
similarity index 57%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
index adb1c62..0fd14ba 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +15,32 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
-import io.openmessaging.connector.api.source.SourceTask;
+public class EnumColumnParser extends ColumnParser {
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+ private String[] enumValues;
+
+ public EnumColumnParser(String colType) {
+ enumValues = extractEnumValues(colType);
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ Integer i = (Integer) value;
+ if (i == 0) {
+ return null;
+ } else {
+ return enumValues[i - 1];
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..36c6078
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+ private int bits;
+ private boolean signed;
+
+ public IntColumnParser(String dataType, String colType) {
+
+ switch (dataType) {
+ case "tinyint":
+ bits = 8;
+ break;
+ case "smallint":
+ bits = 16;
+ break;
+ case "mediumint":
+ bits = 24;
+ break;
+ case "int":
+ bits = 32;
+ }
+
+ this.signed = !colType.matches(".* unsigned$");
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Long) {
+ return value;
+ }
+
+ if (value instanceof Integer) {
+ Integer i = (Integer) value;
+ if (signed || i > 0) {
+ return i;
+ } else {
+ return (1L << bits) + i;
+ }
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..d1e6bbc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+ private String[] enumValues;
+
+ public SetColumnParser(String colType) {
+ enumValues = extractEnumValues(colType);
+ }
+
+ @Override
+ public Object getValue(Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ StringBuilder builder = new StringBuilder();
+ long l = (Long) value;
+
+ boolean needSplit = false;
+ for (int i = 0; i < enumValues.length; i++) {
+ if (((l >> i) & 1) == 1) {
+ if (needSplit)
+ builder.append(",");
+
+ builder.append(enumValues[i]);
+ needSplit = true;
+ }
+ }
+
+ return builder.toString();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..cd4f04f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+ private String charset;
+
+ public StringColumnParser(String charset) {
+ this.charset = charset.toLowerCase();
+ }
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof String) {
+ return value;
+ }
+
+ byte[] bytes = (byte[]) value;
+
+ switch (charset) {
+ case "utf8":
+ case "utf8mb4":
+ return new String(bytes, Charsets.UTF_8);
+ case "latin1":
+ case "ascii":
+ return new String(bytes, Charsets.ISO_8859_1);
+ case "ucs2":
+ return new String(bytes, Charsets.UTF_16);
+ default:
+ return new String(bytes, Charsets.toCharset(charset));
+
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
similarity index 65%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
index adb1c62..9926d81 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +15,25 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
-import io.openmessaging.connector.api.source.SourceTask;
+import java.sql.Time;
+import java.sql.Timestamp;
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class TimeColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Timestamp) {
+
+ return new Time(((Timestamp) value).getTime());
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
similarity index 61%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
index adb1c62..14cc798 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
@@ -1,5 +1,3 @@
-
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,12 +15,26 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
-import io.openmessaging.connector.api.source.SourceTask;
+import java.sql.Date;
+import java.util.Calendar;
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class YearColumnParser extends ColumnParser {
+
+ @Override
+ public Object getValue(Object value) {
+
+ if (value == null) {
+ return null;
+ }
+
+ if (value instanceof Date) {
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime((Date) value);
+ return calendar.get(Calendar.YEAR);
+ }
+
+ return value;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
new file mode 100644
index 0000000..61323d4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -0,0 +1,187 @@
+package org.apache.rocketmq.connect.jdbc.source;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+
+
+import org.apache.rocketmq.connect.jdbc.schema.*;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+
+public class Querier {
+ static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+ private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
+ protected String topicPrefix;
+ protected String jdbcUrl;
+ private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
+ private Config config = new Config();
+ private DataSource dataSource;
+ private List<Table> list=new LinkedList<>();
+
+
+
+ public List<Table> getList() {
+ return list;
+ }
+
+ public void setList(List<Table> list) {
+ this.list = list;
+ }
+
+ public Connection getConnection() throws SQLException {
+
+ // These config names are the same for both source and sink configs ...
+ String username = config.jdbcUsername;
+ String dbPassword = config.jdbcPassword;
+ jdbcUrl = config.jdbcUrl;
+ Properties properties = new Properties();
+ if (username != null) {
+ properties.setProperty("user", username);
+ }
+ if (dbPassword != null) {
+ properties.setProperty("password", dbPassword);
+ }
+ Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+
+ connections.add(connection);
+ return connection;
+ }
+
+ public void close() {
+ Connection conn;
+ while ((conn = connections.poll()) != null) {
+ try {
+ conn.close();
+ } catch (Throwable e) {
+ log.warn("Error while closing connection to {}", "jdbc", e);
+ }
+ }
+ }
+
+ protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
+
+ String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
+ + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+
+ log.trace("Creating a PreparedStatement '{}'", SQL);
+ PreparedStatement stmt = db.prepareStatement(SQL);
+ return stmt;
+
+ }
+
+ protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
+ String query = "select * from " + string;
+ log.trace("Creating a PreparedStatement '{}'", query);
+ PreparedStatement stmt = db.prepareStatement(query);
+ return stmt;
+
+ }
+
+ protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
+ return stmt.executeQuery();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Querier querier = new Querier();
+ try {
+ querier.start();
+ querier.poll();
+
+ } catch (SQLException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ private Schema schema;
+
+ private Map<Long, Table> tableMap = new HashMap<>();
+
+ public void poll() {
+ try {
+
+ PreparedStatement stmt;
+ String query = "select * from ";
+ Connection conn = dataSource.getConnection();
+
+ for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+ String db = entry.getKey();
+ if(!db.contains("jdbc_db"))
+ continue;
+ Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Table> tableEntry = iterator.next();
+ String tb=tableEntry.getKey();
+ stmt = conn.prepareStatement(query+db + "." +tb);
+ ResultSet rs;
+ rs = stmt.executeQuery();
+ List<String> colList = tableEntry.getValue().getColList();
+ List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+ List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+ while(rs.next()) {
+ Table table=new Table(db, tb);
+ System.out.print("|");
+ table.setColList(colList);
+ table.setRawDataTypeList(DataTypeList);
+ table.setParserList(ParserList);
+
+ for (String string : colList) {
+ table.getDataList().add(rs.getObject(string));
+ System.out.print(string+" : "+rs.getObject(string)+"|");
+ }
+ list.add(table);
+ System.out.println();
+ }
+ }
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void start() throws Exception {
+ initDataSource();
+ schema = new Schema(dataSource);
+ schema.load();
+ }
+
+ private void initDataSource() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+ map.put("url",
+ "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ map.put("username", config.jdbcUsername);
+ map.put("password", config.jdbcPassword);
+ map.put("initialSize", "2");
+ map.put("maxActive", "2");
+ map.put("maxWait", "60000");
+ map.put("timeBetweenEvictionRunsMillis", "60000");
+ map.put("minEvictableIdleTimeMillis", "300000");
+ map.put("validationQuery", "SELECT 1 FROM DUAL");
+ map.put("testWhileIdle", "true");
+ dataSource = DruidDataSourceFactory.createDataSource(map);
+ }
+
+}