You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:22 UTC

[rocketmq-connect] 14/43: develop the jdbcsource connector

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 488223c3aa3d15ce4676e6442d894cb99b06275e
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Fri Aug 9 12:04:51 2019 +0800

    develop the jdbcsource connector
---
 README.md                                          |  43 ++-
 pom.xml                                            |   8 +
 .../org/apache/rocketmq/connect/jdbc/Config.java   |  52 ++--
 .../jdbc/connector/JdbcSourceConnector.java        |  12 +-
 .../connect/jdbc/connector/JdbcSourceTask.java     |  39 +--
 .../rocketmq/connect/jdbc/schema/Database.java     |   4 +-
 .../rocketmq/connect/jdbc/schema/Schema.java       |   4 +-
 .../apache/rocketmq/connect/jdbc/schema/Table.java |  36 +--
 .../rocketmq/connect/jdbc/source/Querier.java      | 314 ++++++++++-----------
 9 files changed, 270 insertions(+), 242 deletions(-)

diff --git a/README.md b/README.md
index 96da884..32da4b5 100644
--- a/README.md
+++ b/README.md
@@ -149,34 +149,55 @@ httpPort=8081
 
 看到日志目录查看connect_runtime.log
 
-如果看到以下日志说明runttiime启动成功了
-
-2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
-2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
-
-2、启动sourceConnector
-
-​	正在做测试(To be continued)已实现Bulk Mode
+windows用户可以用CMD到程序根目录下再输入:
 
+```
 cd target/distribution/
 
 java -cp .;./conf/;./lib/* org.apache.rocketmq.connect.runtime.ConnectStartup -c conf/connect.conf
+```
 
+如果看到以下日志说明runttiime启动成功了
 
+2019-07-16 10:56:24 INFO RebalanceService - RebalanceService service started
+2019-07-16 10:56:24 INFO main - The worker [DEFAULT_WORKER_1] boot success.
+
+2、启动sourceConnector
 
-在http中输入Get 请求
+```
+1、git clone https://github.com/apache/rocketmq-externals.git
 
+2、cd rocketmq-externals/rocketmq-connect-jdbc
 
+3、mvn -Dmaven.test.skip=true package
 
-示例
+```
 
-[http://127.0.0.1:8085/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}](http://127.0.0.1:8085/connectors/testSourceConnector1?config={% [...]
+- 复制第三方jar至target
 
+```
+mvn dependency:copy-dependencies
+```
 
 
 
+已实现Bulk查询方法,在http中输入Get 请求(目前仅适配过MYSQL)
 
+```http
+http://127.0.0.1:8081/connectors/testSourceConnector1?config={"connector-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","jdbcUrl":"127.0.0.1:3306","jdbcUsername":"root","jdbcPassword":"123456","task-class":"org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConnector","rocketmqTopic":"jdbcTopic","mode":"bulk","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+```
 
+看到一下日志说明Jdbc source connector启动成功了
 
+2019-08-09 11:33:22 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter
+2019-08-09 11:33:23 INFO pool-9-thread-1 - Config.load.start
+2019-08-09 11:33:23 INFO pool-9-thread-1 - querier.start
+2019-08-09 11:33:23 INFO pool-9-thread-1 - {password=199812160, validationQuery=SELECT 1 FROM DUAL, testWhileIdle=true, timeBetweenEvictionRunsMillis=60000, minEvictableIdleTimeMillis=300000, initialSize=2, driverClassName=com.mysql.cj.jdbc.Driver, maxWait=60000, url=jdbc:mysql://localhost:3306?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8, username=root, maxActive=2},config read successful
+2019-08-09 11:33:24 INFO RebalanceService - JdbcSourceConnector verifyAndSetConfig enter
+2019-08-09 11:33:25 INFO pool-9-thread-1 - {dataSource-1} inited
+2019-08-09 11:33:27 INFO pool-9-thread-1 - schema load successful
+2019-08-09 11:33:27 INFO pool-9-thread-1 - querier.poll
 
+3、启动sinkConnector
 
+To Be Continued.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 830f9ed..1d708f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,14 @@
                 <artifactId>clirr-maven-plugin</artifactId>
                 <version>2.7</version>
             </plugin>
+			<plugin> 
+				<artifactId>maven-dependency-plugin</artifactId> 
+				<configuration> 
+				<outputDirectory>${project.build.directory}/lib</outputDirectory> 
+				<excludeTransitive>false</excludeTransitive> 
+				<stripVersion>true</stripVersion> 
+				</configuration> 
+			</plugin> 
             <plugin>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.6.1</version>
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 4f7456b..533d53b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -25,56 +25,56 @@ import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 public class Config {
     @SuppressWarnings("serial")
 
     private static final Logger LOG = LoggerFactory.getLogger(Config.class);
 
     /* Database Connection Config */
-    public String jdbcUrl="localhost:3306";
-    public String jdbcUsername="root";
-    public String jdbcPassword="199812160";
+    public String jdbcUrl = "localhost:3306";
+    public String jdbcUsername = "root";
+    public String jdbcPassword = "199812160";
     public String rocketmqTopic;
     public String jdbcBackoff;
     public String jdbcAttempts;
-    public String catalogPattern=null;
+    public String catalogPattern = null;
     public List tableWhitelist;
     public List tableBlacklist;
-    public String schemaPattern=null;
-    public boolean numericPrecisionMapping=false;
-    public String bumericMapping=null;
-    public String dialectName="";
+    public String schemaPattern = null;
+    public boolean numericPrecisionMapping = false;
+    public String bumericMapping = null;
+    public String dialectName = "";
 
     /* Mode Config */
-    public String mode="";
-    public String incrementingColumnName= "";
-    public String query="";
-    public String timestampColmnName="";
-    public boolean validateNonNull=true;
+    public String mode = "";
+    public String incrementingColumnName = "";
+    public String query = "";
+    public String timestampColmnName = "";
+    public boolean validateNonNull = true;
 
     /*Connector config*/
-    public String tableTypes="table";
-    public long pollInterval=5000;
-    public int batchMaxRows=100;
-    public long tablePollInterval=60000;
-    public long timestampDelayInterval=0;
-    public String dbTimezone="UTC";
+    public String tableTypes = "table";
+    public long pollInterval = 5000;
+    public int batchMaxRows = 100;
+    public long tablePollInterval = 60000;
+    public long timestampDelayInterval = 0;
+    public String dbTimezone = "UTC";
     public String queueName;
 
     private Logger log = LoggerFactory.getLogger(Config.class);
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
-          //  add("jdbcUrl");
-          //  add("jdbcUsername");
-         //   add("jdbcPassword");
-        //    add("mode");
-        //    add("rocketmqTopic");
+            add("jdbcUrl");
+            add("jdbcUsername");
+            add("jdbcPassword");
+            //    add("mode");
+            //    add("rocketmqTopic");
         }
     };
 
-
     public void load(KeyValue props) {
-			log.info("Config.load.start");
+        log.info("Config.load.start");
         properties2Object(props, this);
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 8c30a62..bdbeb8b 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -37,7 +37,7 @@ public class JdbcSourceConnector extends SourceConnector {
     @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        log.info("1216123 JdbcSourceConnector verifyAndSetConfig enter");
+        log.info("JdbcSourceConnector verifyAndSetConfig enter");
         for (String requestKey : Config.REQUEST_CONFIG) {
 
             if (!config.containsKey(requestKey)) {
@@ -45,7 +45,7 @@ public class JdbcSourceConnector extends SourceConnector {
             }
         }
         this.config = config;
-	
+
         return "";
     }
 
@@ -68,13 +68,13 @@ public class JdbcSourceConnector extends SourceConnector {
     }
 
     @Override
-    public Class<? extends Task> taskClass(){
-	        return JdbcSourceTask.class;
-	}
+    public Class<? extends Task> taskClass() {
+        return JdbcSourceTask.class;
+    }
 
     @Override
     public List<KeyValue> taskConfigs() {
-					log.info("List.start");
+        log.info("List.start");
         List<KeyValue> config = new ArrayList<>();
         config.add(this.config);
         return config;
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index 78f1809..91659ec 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -18,6 +18,7 @@
  */
 
 package org.apache.rocketmq.connect.jdbc.connector;
+
 import io.openmessaging.connector.api.source.SourceTask;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,32 +48,32 @@ public class JdbcSourceTask extends SourceTask {
     private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
 
     private Config config;
-    
-	private List<Table> list=new LinkedList<>();
-	
-	Querier querier = new Querier();
+
+    private List<Table> list = new LinkedList<>();
+
+    Querier querier = new Querier();
+
     @Override
     public Collection<SourceDataEntry> poll() {
         List<SourceDataEntry> res = new ArrayList<>();
         try {
-            
+
             JSONObject jsonObject = new JSONObject();
             jsonObject.put("nextQuery", "database");
             jsonObject.put("nextPosition", "10");
-        	//To be Continued
-			log.info("querier.poll");
+            //To be Continued
             querier.poll();
-									log.info("1216connector.start");
-			int mm=0;
-            for(Table dataRow : querier.getList()){
-            	System.out.println(dataRow.getColList().get(0));
-										log.info("xunhuankaishi");
+            log.info("querier.poll, start");
+			int mm = 0;
+            for (Table dataRow : querier.getList()) {
+                System.out.println(dataRow.getColList().get(0));
+                log.info("xunhuankaishi");
                 log.info("Received {} record: {} ", dataRow.getColList().get(0), mm++);
-				Schema schema = new Schema();
+                Schema schema = new Schema();
                 schema.setDataSource(dataRow.getDatabase());
                 schema.setName(dataRow.getName());
                 schema.setFields(new ArrayList<>());
-                for(int i = 0; i < dataRow.getColList().size(); i++){
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
                     String columnName = dataRow.getColList().get(i);
                     String rawDataType = dataRow.getRawDataTypeList().get(i);
                     Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
@@ -82,10 +83,10 @@ public class JdbcSourceTask extends SourceTask {
                 dataEntryBuilder.timestamp(System.currentTimeMillis())
                     .queue(dataRow.getName())
                     .entryType(EntryType.CREATE);
-                for(int i = 0; i < dataRow.getColList().size(); i++){
-                	Object value=dataRow.getDataList().get(i);
+                for (int i = 0; i < dataRow.getColList().size(); i++) {
+                    Object value = dataRow.getDataList().get(i);
                     System.out.println(1);
-                	System.out.println(dataRow.getColList().get(i)+"|"+value);
+                    System.out.println(dataRow.getColList().get(i) + "|" + value);
                     dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
                 }
                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
@@ -104,8 +105,8 @@ public class JdbcSourceTask extends SourceTask {
         try {
             this.config = new Config();
             this.config.load(props);
-						log.info("querier.start");
-    		querier.start();
+            log.info("querier.start");
+            querier.start();
 
         } catch (Exception e) {
             log.error("JDBC task start failed.", e);
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
index b88661d..657ebb8 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.connect.jdbc.schema;
 
 //import io.openmessaging.mysql.binlog.EventProcessor;
+
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -36,6 +37,7 @@ public class Database {
     private String name;
     private DataSource dataSource;
     public Map<String, Table> tableMap = new HashMap<String, Table>();
+
     public Database(String name, DataSource dataSource) {
         this.name = name;
         this.dataSource = dataSource;
@@ -59,7 +61,7 @@ public class Database {
                 String dataType = rs.getString(3);
                 String colType = rs.getString(4);
                 String charset = rs.getString(5);
-                
+
                 ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
 
                 if (!tableMap.containsKey(tableName)) {
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
index 6ce6621..7434bbc 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -116,8 +116,8 @@ public class Schema {
                 load();
                 break;
             } catch (Exception e) {
-     //           LOGGER.error("Reload schema error.", e);
-            System.out.println("Reload schema error."+e);
+                //           LOGGER.error("Reload schema error.", e);
+                System.out.println("Reload schema error." + e);
             }
         }
     }
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
index c0d793d..8c9a42d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
@@ -28,7 +28,7 @@ public class Table {
     private List<String> colList = new LinkedList<>();
     private List<ColumnParser> parserList = new LinkedList<>();
     private List<String> rawDataTypeList = new LinkedList<>();
-    private List<Object> dataList =new LinkedList<>();
+    private List<Object> dataList = new LinkedList<>();
 
     public Table(String database, String table) {
         this.database = database;
@@ -40,18 +40,18 @@ public class Table {
     }
 
     public void setParserList(List<ColumnParser> parserList) {
-		this.parserList = parserList;
-	}
+        this.parserList = parserList;
+    }
 
-	public void setRawDataTypeList(List<String> rawDataTypeList) {
-		this.rawDataTypeList = rawDataTypeList;
-	}
+    public void setRawDataTypeList(List<String> rawDataTypeList) {
+        this.rawDataTypeList = rawDataTypeList;
+    }
 
-	public void addParser(ColumnParser columnParser) {
+    public void addParser(ColumnParser columnParser) {
         parserList.add(columnParser);
     }
 
-    public void addRawDataType(String rawDataType){
+    public void addRawDataType(String rawDataType) {
         this.rawDataTypeList.add(rawDataType);
     }
 
@@ -75,16 +75,16 @@ public class Table {
         return parserList;
     }
 
-	public List<Object> getDataList() {
-		return dataList;
-	}
+    public List<Object> getDataList() {
+        return dataList;
+    }
+
+    public void setDataList(List<Object> dataList) {
+        this.dataList = dataList;
+    }
 
-	public void setDataList(List<Object> dataList) {
-		this.dataList = dataList;
-	}
+    public void setColList(List<String> colList) {
+        this.colList = colList;
+    }
 
-	public void setColList(List<String> colList) {
-		this.colList = colList;
-	}
-    
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
index 1f630ea..3d5b2c6 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -22,168 +22,164 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.druid.pool.DruidDataSourceFactory;
 
-
 import org.apache.rocketmq.connect.jdbc.schema.*;
 import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
 
 public class Querier {
-	static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
-	private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
-	protected String topicPrefix;
-	protected String jdbcUrl;
-	private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
-	private Config config = new Config();
-	private DataSource dataSource;
-	private List<Table> list=new LinkedList<>();
-
-	
-	
-	public List<Table> getList() {
-		return list;
-	}
-
-	public void setList(List<Table> list) {
-		this.list = list;
-	}
-
-	public Connection getConnection() throws SQLException {
-
-		// These config names are the same for both source and sink configs ...
-		String username = config.jdbcUsername;
-		String dbPassword = config.jdbcPassword;
-		jdbcUrl = config.jdbcUrl;
-		Properties properties = new Properties();
-		if (username != null) {
-			properties.setProperty("user", username);
-		}
-		if (dbPassword != null) {
-			properties.setProperty("password", dbPassword);
-		}
-		Connection connection = DriverManager.getConnection(jdbcUrl, properties);
-
-		connections.add(connection);
-		return connection;
-	}
-
-	public void stop() {
-		Connection conn;
-		while ((conn = connections.poll()) != null) {
-			try {
-				conn.close();
-			} catch (Throwable e) {
-				log.warn("Error while closing connection to {}", "jdbc", e);
-			}
-		}
-	}
-
-	protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
-
-		String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
-				+ "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
-
-		log.trace("Creating a PreparedStatement '{}'", SQL);
-		PreparedStatement stmt = db.prepareStatement(SQL);
-		return stmt;
-
-	}
-
-	protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
-		String query = "select * from " + string;
-		log.trace("Creating a PreparedStatement '{}'", query);
-		PreparedStatement stmt = db.prepareStatement(query);
-		return stmt;
-
-	}
-
-	protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
-		return stmt.executeQuery();
-	}
-
-	public static void main(String[] args) throws Exception {
-		Querier querier = new Querier();
-		try {
-			querier.start();
-			querier.poll();
-
-		} catch (SQLException e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-
-	}
-
-	private Schema schema;
-
-
-	public void poll() {
-		try {
-
-			PreparedStatement stmt;
-			String query = "select * from ";
-			Connection conn = dataSource.getConnection();
-
-			for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
-				String db = entry.getKey();
-								if(!db.contains("jdbc_db"))
-					continue;
-				Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
-				while (iterator.hasNext()) {
-					Map.Entry<String, Table> tableEntry = iterator.next();
-					String tb=tableEntry.getKey();
-					stmt = conn.prepareStatement(query+db + "." +tb);
-					ResultSet rs;
-					rs = stmt.executeQuery();
-				    List<String> colList = tableEntry.getValue().getColList();
-				    List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
-				    List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
-
-				    while(rs.next()) {
-					    Table table=new Table(db, tb);
-					    System.out.print("|");
-			    		table.setColList(colList);
-			    		table.setRawDataTypeList(DataTypeList);
-			    		table.setParserList(ParserList);
-			    		
-				    	for (String string : colList) {
-				    		table.getDataList().add(rs.getObject(string));
-				    		System.out.print(string+" : "+rs.getObject(string)+"|");
-					}
-				    	list.add(table);
-				    	System.out.println();
-				    }
-				}
-			}
-
-		} catch (SQLException e) {
-			e.printStackTrace();
-		}
-
-	}
-
-	public void start() throws Exception {
-		initDataSource();
-		schema = new Schema(dataSource);
-		schema.load();
-		log.info("schema load successful");
-	}
-
-	private void initDataSource() throws Exception {
-		Map<String, String> map = new HashMap<>();
-		map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
-		map.put("url",
-				"jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
-		map.put("username", config.jdbcUsername);
-		map.put("password", config.jdbcPassword);
-		map.put("initialSize", "2");
-		map.put("maxActive", "2");
-		map.put("maxWait", "60000");
-		map.put("timeBetweenEvictionRunsMillis", "60000");
-		map.put("minEvictableIdleTimeMillis", "300000");
-		map.put("validationQuery", "SELECT 1 FROM DUAL");
-		map.put("testWhileIdle", "true");
-		log.info("{},config read successful",map);
-		dataSource = DruidDataSourceFactory.createDataSource(map);
-
-	}
+    static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+    private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
+    protected String topicPrefix;
+    protected String jdbcUrl;
+    private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
+    private Config config = new Config();
+    private DataSource dataSource;
+    private List<Table> list = new LinkedList<>();
+
+    public List<Table> getList() {
+        return list;
+    }
+
+    public void setList(List<Table> list) {
+        this.list = list;
+    }
+
+    public Connection getConnection() throws SQLException {
+
+        // These config names are the same for both source and sink configs ...
+        String username = config.jdbcUsername;
+        String dbPassword = config.jdbcPassword;
+        jdbcUrl = config.jdbcUrl;
+        Properties properties = new Properties();
+        if (username != null) {
+            properties.setProperty("user", username);
+        }
+        if (dbPassword != null) {
+            properties.setProperty("password", dbPassword);
+        }
+        Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+
+        connections.add(connection);
+        return connection;
+    }
+
+    public void stop() {
+        Connection conn;
+        while ((conn = connections.poll()) != null) {
+            try {
+                conn.close();
+            } catch (Throwable e) {
+                log.warn("Error while closing connection to {}", "jdbc", e);
+            }
+        }
+    }
+
+    protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
+
+        String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
+            + "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+
+        log.trace("Creating a PreparedStatement '{}'", SQL);
+        PreparedStatement stmt = db.prepareStatement(SQL);
+        return stmt;
+
+    }
+
+    protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
+        String query = "select * from " + string;
+        log.trace("Creating a PreparedStatement '{}'", query);
+        PreparedStatement stmt = db.prepareStatement(query);
+        return stmt;
+
+    }
+
+    protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
+        return stmt.executeQuery();
+    }
+
+    public static void main(String[] args) throws Exception {
+        Querier querier = new Querier();
+        try {
+            querier.start();
+            querier.poll();
+
+        } catch (SQLException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
+    }
+
+    private Schema schema;
+
+    public void poll() {
+        try {
+
+            PreparedStatement stmt;
+            String query = "select * from ";
+            Connection conn = dataSource.getConnection();
+
+            for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+                String db = entry.getKey();
+                if (!db.contains("jdbc_db"))
+                    continue;
+                Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+                while (iterator.hasNext()) {
+                    Map.Entry<String, Table> tableEntry = iterator.next();
+                    String tb = tableEntry.getKey();
+                    stmt = conn.prepareStatement(query + db + "." + tb);
+                    ResultSet rs;
+                    rs = stmt.executeQuery();
+                    List<String> colList = tableEntry.getValue().getColList();
+                    List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+                    List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+                    while (rs.next()) {
+                        Table table = new Table(db, tb);
+                        System.out.print("|");
+                        table.setColList(colList);
+                        table.setRawDataTypeList(DataTypeList);
+                        table.setParserList(ParserList);
+
+                        for (String string : colList) {
+                            table.getDataList().add(rs.getObject(string));
+                            System.out.print(string + " : " + rs.getObject(string) + "|");
+                        }
+                        list.add(table);
+                        System.out.println();
+                    }
+                }
+            }
+
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    public void start() throws Exception {
+        initDataSource();
+        schema = new Schema(dataSource);
+        schema.load();
+        log.info("schema load successful");
+    }
+
+    private void initDataSource() throws Exception {
+        Map<String, String> map = new HashMap<>();
+        map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+        map.put("url",
+            "jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+        map.put("username", config.jdbcUsername);
+        map.put("password", config.jdbcPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+        log.info("{},config read successful", map);
+        dataSource = DruidDataSourceFactory.createDataSource(map);
+
+    }
 
 }