You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:22 UTC
[rocketmq-connect] 14/43: develop the jdbcsource connector
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 488223c3aa3d15ce4676e6442d894cb99b06275e
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Fri Aug 9 12:04:51 2019 +0800
develop the jdbcsource connector
---
README.md | 43 ++-
pom.xml | 8 +
.../org/apache/rocketmq/connect/jdbc/Config.java | 52 ++--
.../jdbc/connector/JdbcSourceConnector.java | 12 +-
.../connect/jdbc/connector/JdbcSourceTask.java | 39 +--
.../rocketmq/connect/jdbc/schema/Database.java | 4 +-
.../rocketmq/connect/jdbc/schema/Schema.java | 4 +-
.../apache/rocketmq/connect/jdbc/schema/Table.java | 36 +--
.../rocketmq/connect/jdbc/source/Querier.java | 314 ++++++++++-----------
9 files changed, 270 insertions(+), 242 deletions(-)
diff --git a/README.md b/README.md
index 96da884..32da4b5 100644
--- a/README.md
+++ b/README.md
@@ -149,34 +149,55 @@ httpPort=8081
看到日志目录查看connect_runtime.log
-如果看到以下日志说明runttiime启动成功了
-
-2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
-2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
-
-2、启动sourceConnector
-
- 正在做测试(To be continued)已实现Bulk Mode
+windows用户可以用CMD到程序根目录下再输入:
+```
cd target/distribution/
java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
+```
+如果看到以下日志说明runttiime启动成功了
+2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
+2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
+
+2、启动sourceConnector
-在http中输入Get 请求
+```
+1、git clone https://github.com/apache/rocketmq-externals.git
+2、cd rocketmq-externals/rocketmq-connect-jdbc
+3、mvn -Dmaven.test.skip=true package
-示例
+```
-[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...]
+- 复制第三方jar至target
+```
+mvn dependency:copy-dependencies
+```
+已实现Bulk查询方法,在http中输入Get 请求(目前仅适配过MYSQL)
+```http
+http://127.0.0.1:8081/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
+看到一下日志说明Jdbc source connector启动成功了
+2019-08-09 11:33:22 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter
+2019-08-09 11:33:23 INFO pool-9-thread-1 - Config.load.start
+2019-08-09 11:33:23 INFO pool-9-thread-1 - querier.start
+2019-08-09 11:33:23 INFO pool-9-thread-1 - {password=199812160, validationQuery=SELECT 1 FROM DUAL, testWhileIdle=true, timeBetweenEvictionRunsMillis=60000, minEvictableIdleTimeMillis=300000, initialSize=2, driverClassName=com.mysql.cj.jdbc.Driver, maxWait=60000, url=jdbc:mysql://localhost:3306?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8, username=root, maxActive=2},config read successful
+2019-08-09 11:33:24 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter
+2019-08-09 11:33:25 INFO pool-9-thread-1 - {dataSource-1} inited
+2019-08-09 11:33:27 INFO pool-9-thread-1 - schema load successful
+2019-08-09 11:33:27 INFO pool-9-thread-1 - querier.poll
+3、启动sinkConnector
+To Be Continued.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 830f9ed..1d708f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,14 @@
<artifactId>clirr-maven-plugin</artifactId>
<version>2.7</version>
</plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 4f7456b..533d53b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -25,56 +25,56 @@ import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
public class Config {
@SuppressWarnings("serial")
private static final Logger LOG = LoggerFactory.getLogger(Config.class);
/* Database Connection Config */
- public String jdbcUrl="localhost:3306";
- public String jdbcUsername="root";
- public String jdbcPassword="199812160";
+ public String jdbcUrl = "localhost:3306";
+ public String jdbcUsername = "root";
+ public String jdbcPassword = "199812160";
public String rocketmqTopic;
public String jdbcBackoff;
public String jdbcAttempts;
- public String catalogPattern=null;
+ public String catalogPattern = null;
public List tableWhitelist;
public List tableBlacklist;
- public String schemaPattern=null;
- public boolean numericPrecisionMapping=false;
- public String bumericMapping=null;
- public String dialectName="";
+ public String schemaPattern = null;
+ public boolean numericPrecisionMapping = false;
+ public String bumericMapping = null;
+ public String dialectName = "";
/* Mode Config */
- public String mode="";
- public String incrementingColumnName= "";
- public String query="";
- public String timestampColmnName="";
- public boolean validateNonNull=true;
+ public String mode = "";
+ public String incrementingColumnName = "";
+ public String query = "";
+ public String timestampColmnName = "";
+ public boolean validateNonNull = true;
/*Connector config*/
- public String tableTypes="table";
- public long pollInterval=5000;
- public int batchMaxRows=100;
- public long tablePollInterval=60000;
- public long timestampDelayInterval=0;
- public String dbTimezone="UTC";
+ public String tableTypes = "table";
+ public long pollInterval = 5000;
+ public int batchMaxRows = 100;
+ public long tablePollInterval = 60000;
+ public long timestampDelayInterval = 0;
+ public String dbTimezone = "UTC";
public String queueName;
private Logger log = LoggerFactory.getLogger(Config.class);
public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
- // add("jdbcUrl");
- // add("jdbcUsername");
- // add("jdbcPassword");
- // add("mode");
- // add("rocketmqTopic");
+ add("jdbcUrl");
+ add("jdbcUsername");
+ add("jdbcPassword");
+ // add("mode");
+ // add("rocketmqTopic");
}
};
-
public void load(KeyValue props) {
- log.info("Config.load.start");
+ log.info("Config.load.start");
properties2Object(props, this);
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 8c30a62..bdbeb8b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -37,7 +37,7 @@ public class JdbcSourceConnector extends SourceConnector {
@Override
public String verifyAndSetConfig(KeyValue config) {
- log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
+ log.info("JdbcSourceConnector verifyAndSetConfig enter");
for (String requestKey : Config.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
@@ -45,7 +45,7 @@ public class JdbcSourceConnector extends SourceConnector {
}
}
this.config = config;
-
+
return "";
}
@@ -68,13 +68,13 @@ public class JdbcSourceConnector extends SourceConnector {
}
@Override
- public Class<? extends Task> taskClass(){
- return JdbcSourceTask.class;
- }
+ public Class<? extends Task> taskClass() {
+ return JdbcSourceTask.class;
+ }
@Override
public List<KeyValue> taskConfigs() {
- log.info("List.start");
+ log.info("List.start");
List<KeyValue> config = new ArrayList<>();
config.add(this.config);
return config;
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 78f1809..91659ec 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -18,6 +18,7 @@
*/
package org.apache.rocketmq.connect.jdbc.connector;
+
import io.openmessaging.connector.api.source.SourceTask;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -47,32 +48,32 @@ public class JdbcSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
private Config config;
-
- private List<Table> list=new LinkedList<>();
-
- Querier querier = new Querier();
+
+ private List<Table> list = new LinkedList<>();
+
+ Querier querier = new Querier();
+
@Override
public Collection<SourceDataEntry> poll() {
List<SourceDataEntry> res = new ArrayList<>();
try {
-
+
JSONObject jsonObject = new JSONObject();
jsonObject.put("nextQuery", "database");
jsonObject.put("nextPosition", "10");
- //To be Continued
- log.info("querier.poll");
+ //To be Continued
querier.poll();
- log.info("1216connector.start");
- int mm=0;
- for(Table dataRow : querier.getList()){
- System.out.println(dataRow.getColList().get(0));
- log.info("xunhuankaishi");
+ log.info("querier.poll, start");
+ int mm = 0;
+ for (Table dataRow : querier.getList()) {
+ System.out.println(dataRow.getColList().get(0));
+ log.info("xunhuankaishi");
log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
- Schema schema = new Schema();
+ Schema schema = new Schema();
schema.setDataSource(dataRow.getDatabase());
schema.setName(dataRow.getName());
schema.setFields(new ArrayList<>());
- for(int i = 0; i < dataRow.getColList().size(); i++){
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
String columnName = dataRow.getColList().get(i);
String rawDataType = dataRow.getRawDataTypeList().get(i);
Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
@@ -82,10 +83,10 @@ public class JdbcSourceTask extends SourceTask {
dataEntryBuilder.timestamp(System.currentTimeMillis())
.queue(dataRow.getName())
.entryType(EntryType.CREATE);
- for(int i = 0; i < dataRow.getColList().size(); i++){
- Object value=dataRow.getDataList().get(i);
+ for (int i = 0; i < dataRow.getColList().size(); i++) {
+ Object value = dataRow.getDataList().get(i);
System.out.println(1);
- System.out.println(dataRow.getColList().get(i)+"|"+value);
+ System.out.println(dataRow.getColList().get(i) + "|" + value);
dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
}
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
@@ -104,8 +105,8 @@ public class JdbcSourceTask extends SourceTask {
try {
this.config = new Config();
this.config.load(props);
- log.info("querier.start");
- querier.start();
+ log.info("querier.start");
+ querier.start();
} catch (Exception e) {
log.error("JDBC task start failed.", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index b88661d..657ebb8 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.connect.jdbc.schema;
//import io.openmessaging.mysql.binlog.EventProcessor;
+
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -36,6 +37,7 @@ public class Database {
private String name;
private DataSource dataSource;
public Map<String, Table> tableMap = new HashMap<String, Table>();
+
public Database(String name, DataSource dataSource) {
this.name = name;
this.dataSource = dataSource;
@@ -59,7 +61,7 @@ public class Database {
String dataType = rs.getString(3);
String colType = rs.getString(4);
String charset = rs.getString(5);
-
+
ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
if (!tableMap.containsKey(tableName)) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
index 6ce6621..7434bbc 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -116,8 +116,8 @@ public class Schema {
load();
break;
} catch (Exception e) {
- // LOGGER.error("Reload schema error.", e);
- System.out.println("Reload schema error."+e);
+ // LOGGER.error("Reload schema error.", e);
+ System.out.println("Reload schema error." + e);
}
}
}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
index c0d793d..8c9a42d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
@@ -28,7 +28,7 @@ public class Table {
private List<String> colList = new LinkedList<>();
private List<ColumnParser> parserList = new LinkedList<>();
private List<String> rawDataTypeList = new LinkedList<>();
- private List<Object> dataList =new LinkedList<>();
+ private List<Object> dataList = new LinkedList<>();
public Table(String database, String table) {
this.database = database;
@@ -40,18 +40,18 @@ public class Table {
}
public void setParserList(List<ColumnParser> parserList) {
- this.parserList = parserList;
- }
+ this.parserList = parserList;
+ }
- public void setRawDataTypeList(List<String> rawDataTypeList) {
- this.rawDataTypeList = rawDataTypeList;
- }
+ public void setRawDataTypeList(List<String> rawDataTypeList) {
+ this.rawDataTypeList = rawDataTypeList;
+ }
- public void addParser(ColumnParser columnParser) {
+ public void addParser(ColumnParser columnParser) {
parserList.add(columnParser);
}
- public void addRawDataType(String rawDataType){
+ public void addRawDataType(String rawDataType) {
this.rawDataTypeList.add(rawDataType);
}
@@ -75,16 +75,16 @@ public class Table {
return parserList;
}
- public List<Object> getDataList() {
- return dataList;
- }
+ public List<Object> getDataList() {
+ return dataList;
+ }
+
+ public void setDataList(List<Object> dataList) {
+ this.dataList = dataList;
+ }
- public void setDataList(List<Object> dataList) {
- this.dataList = dataList;
- }
+ public void setColList(List<String> colList) {
+ this.colList = colList;
+ }
- public void setColList(List<String> colList) {
- this.colList = colList;
- }
-
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 1f630ea..3d5b2c6 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -22,168 +22,164 @@ import org.slf4j.LoggerFactory;
import com.alibaba.druid.pool.DruidDataSourceFactory;
-
import org.apache.rocketmq.connect.jdbc.schema.*;
import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
public class Querier {
- static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
- private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
- protected String topicPrefix;
- protected String jdbcUrl;
- private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
- private Config config = new Config();
- private DataSource dataSource;
- private List<Table> list=new LinkedList<>();
-
-
-
- public List<Table> getList() {
- return list;
- }
-
- public void setList(List<Table> list) {
- this.list = list;
- }
-
- public Connection getConnection() throws SQLException {
-
- // These config names are the same for both source and sink configs ...
- String username = config.jdbcUsername;
- String dbPassword = config.jdbcPassword;
- jdbcUrl = config.jdbcUrl;
- Properties properties = new Properties();
- if (username != null) {
- properties.setProperty("user", username);
- }
- if (dbPassword != null) {
- properties.setProperty("password", dbPassword);
- }
- Connection connection = DriverManager.getConnection(jdbcUrl, properties);
-
- connections.add(connection);
- return connection;
- }
-
- public void stop() {
- Connection conn;
- while ((conn = connections.poll()) != null) {
- try {
- conn.close();
- } catch (Throwable e) {
- log.warn("Error while closing connection to {}", "jdbc", e);
- }
- }
- }
-
- protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
-
- String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
- + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
-
- log.trace("Creating a PreparedStatement '{}'", SQL);
- PreparedStatement stmt = db.prepareStatement(SQL);
- return stmt;
-
- }
-
- protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
- String query = "select * from " + string;
- log.trace("Creating a PreparedStatement '{}'", query);
- PreparedStatement stmt = db.prepareStatement(query);
- return stmt;
-
- }
-
- protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
- return stmt.executeQuery();
- }
-
- public static void main(String[] args) throws Exception {
- Querier querier = new Querier();
- try {
- querier.start();
- querier.poll();
-
- } catch (SQLException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- private Schema schema;
-
-
- public void poll() {
- try {
-
- PreparedStatement stmt;
- String query = "select * from ";
- Connection conn = dataSource.getConnection();
-
- for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
- String db = entry.getKey();
- if(!db.contains("jdbc_db"))
- continue;
- Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<String, Table> tableEntry = iterator.next();
- String tb=tableEntry.getKey();
- stmt = conn.prepareStatement(query+db + "." +tb);
- ResultSet rs;
- rs = stmt.executeQuery();
- List<String> colList = tableEntry.getValue().getColList();
- List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
- List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
-
- while(rs.next()) {
- Table table=new Table(db, tb);
- System.out.print("|");
- table.setColList(colList);
- table.setRawDataTypeList(DataTypeList);
- table.setParserList(ParserList);
-
- for (String string : colList) {
- table.getDataList().add(rs.getObject(string));
- System.out.print(string+" : "+rs.getObject(string)+"|");
- }
- list.add(table);
- System.out.println();
- }
- }
- }
-
- } catch (SQLException e) {
- e.printStackTrace();
- }
-
- }
-
- public void start() throws Exception {
- initDataSource();
- schema = new Schema(dataSource);
- schema.load();
- log.info("schema load successful");
- }
-
- private void initDataSource() throws Exception {
- Map<String, String> map = new HashMap<>();
- map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
- map.put("url",
- "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
- map.put("username", config.jdbcUsername);
- map.put("password", config.jdbcPassword);
- map.put("initialSize", "2");
- map.put("maxActive", "2");
- map.put("maxWait", "60000");
- map.put("timeBetweenEvictionRunsMillis", "60000");
- map.put("minEvictableIdleTimeMillis", "300000");
- map.put("validationQuery", "SELECT 1 FROM DUAL");
- map.put("testWhileIdle", "true");
- log.info("{},config read successful",map);
- dataSource = DruidDataSourceFactory.createDataSource(map);
-
- }
+ static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+ private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
+ protected String topicPrefix;
+ protected String jdbcUrl;
+ private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
+ private Config config = new Config();
+ private DataSource dataSource;
+ private List<Table> list = new LinkedList<>();
+
+ public List<Table> getList() {
+ return list;
+ }
+
+ public void setList(List<Table> list) {
+ this.list = list;
+ }
+
+ public Connection getConnection() throws SQLException {
+
+ // These config names are the same for both source and sink configs ...
+ String username = config.jdbcUsername;
+ String dbPassword = config.jdbcPassword;
+ jdbcUrl = config.jdbcUrl;
+ Properties properties = new Properties();
+ if (username != null) {
+ properties.setProperty("user", username);
+ }
+ if (dbPassword != null) {
+ properties.setProperty("password", dbPassword);
+ }
+ Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+
+ connections.add(connection);
+ return connection;
+ }
+
+ public void stop() {
+ Connection conn;
+ while ((conn = connections.poll()) != null) {
+ try {
+ conn.close();
+ } catch (Throwable e) {
+ log.warn("Error while closing connection to {}", "jdbc", e);
+ }
+ }
+ }
+
+ protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
+
+ String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
+ + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+
+ log.trace("Creating a PreparedStatement '{}'", SQL);
+ PreparedStatement stmt = db.prepareStatement(SQL);
+ return stmt;
+
+ }
+
+ protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
+ String query = "select * from " + string;
+ log.trace("Creating a PreparedStatement '{}'", query);
+ PreparedStatement stmt = db.prepareStatement(query);
+ return stmt;
+
+ }
+
+ protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
+ return stmt.executeQuery();
+ }
+
+ public static void main(String[] args) throws Exception {
+ Querier querier = new Querier();
+ try {
+ querier.start();
+ querier.poll();
+
+ } catch (SQLException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ private Schema schema;
+
+ public void poll() {
+ try {
+
+ PreparedStatement stmt;
+ String query = "select * from ";
+ Connection conn = dataSource.getConnection();
+
+ for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+ String db = entry.getKey();
+ if (!db.contains("jdbc_db"))
+ continue;
+ Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Table> tableEntry = iterator.next();
+ String tb = tableEntry.getKey();
+ stmt = conn.prepareStatement(query + db + "." + tb);
+ ResultSet rs;
+ rs = stmt.executeQuery();
+ List<String> colList = tableEntry.getValue().getColList();
+ List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+ List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+ while (rs.next()) {
+ Table table = new Table(db, tb);
+ System.out.print("|");
+ table.setColList(colList);
+ table.setRawDataTypeList(DataTypeList);
+ table.setParserList(ParserList);
+
+ for (String string : colList) {
+ table.getDataList().add(rs.getObject(string));
+ System.out.print(string + " : " + rs.getObject(string) + "|");
+ }
+ list.add(table);
+ System.out.println();
+ }
+ }
+ }
+
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public void start() throws Exception {
+ initDataSource();
+ schema = new Schema(dataSource);
+ schema.load();
+ log.info("schema load successful");
+ }
+
+ private void initDataSource() throws Exception {
+ Map<String, String> map = new HashMap<>();
+ map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+ map.put("url",
+ "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+ map.put("username", config.jdbcUsername);
+ map.put("password", config.jdbcPassword);
+ map.put("initialSize", "2");
+ map.put("maxActive", "2");
+ map.put("maxWait", "60000");
+ map.put("timeBetweenEvictionRunsMillis", "60000");
+ map.put("minEvictableIdleTimeMillis", "300000");
+ map.put("validationQuery", "SELECT 1 FROM DUAL");
+ map.put("testWhileIdle", "true");
+ log.info("{},config read successful", map);
+ dataSource = DruidDataSourceFactory.createDataSource(map);
+
+ }
}