You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:20:16 UTC

[rocketmq-connect] 08/43: Add JdbcSourceTask

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 5a83890b55770820325d32aa7c86df6b477c37b7
Author: yuchenlichuck <yu...@126.com>
AuthorDate: Mon Jul 29 21:20:48 2019 +0800

    Add JdbcSourceTask
---
 .../org/apache/rocketmq/connect/jdbc/Config.java   |  12 +-
 .../apache/rocketmq/connect/jdbc/Replicator.java   | 118 +++++++++++++
 .../connect/jdbc/connector/JdbcSourceTask.java     | 107 +++++++++++-
 .../rocketmq/connect/jdbc/schema/Database.java     | 100 +++++++++++
 .../rocketmq/connect/jdbc/schema/Schema.java       | 128 ++++++++++++++
 .../apache/rocketmq/connect/jdbc/schema/Table.java |  90 ++++++++++
 .../column/BigIntColumnParser.java}                |  40 ++++-
 .../connect/jdbc/schema/column/ColumnParser.java   | 104 ++++++++++++
 .../jdbc/schema/column/DateTimeColumnParser.java   |  53 ++++++
 .../column/DefaultColumnParser.java}               |  27 ++-
 .../column/EnumColumnParser.java}                  |  36 +++-
 .../jdbc/schema/column/IntColumnParser.java        |  66 ++++++++
 .../jdbc/schema/column/SetColumnParser.java        |  54 ++++++
 .../jdbc/schema/column/StringColumnParser.java     |  57 +++++++
 .../column/TimeColumnParser.java}                  |  29 +++-
 .../column/YearColumnParser.java}                  |  30 +++-
 .../rocketmq/connect/jdbc/source/Querier.java      | 187 +++++++++++++++++++++
 17 files changed, 1181 insertions(+), 57 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
index 217b449..69ff9b0 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Config.java
@@ -31,9 +31,9 @@ public class Config {
     private static final Logger LOG = LoggerFactory.getLogger(Config.class);
 
     /* Database Connection Config */
-    public String jdbcUrl;
-    public String jdbcUsername;
-    public String jdbcPassword;
+    public String jdbcUrl="localhost:3306";
+    public String jdbcUsername="root";
+    public String jdbcPassword="199812160";
     public String rocketmqTopic;
     public String jdbcBackoff;
     public String jdbcAttempts;
@@ -54,7 +54,7 @@ public class Config {
 
     /*Connector config*/
     public String tableTypes="table";
-    public int pollInterval=5000;
+    public long pollInterval=5000;
     public int batchMaxRows=100;
     public long tablePollInterval=60000;
     public long timestampDelayInterval=0;
@@ -67,7 +67,7 @@ public class Config {
             add("jdbcUsername");
             add("jdbcPassword");
             add("mode");
-            add("queueName");
+            add("rocketmqTopic");
         }
     };
 
@@ -278,7 +278,7 @@ public class Config {
         this.tableTypes = tableTypes;
     }
 
-    public int getPollInterval() {
+    public long getPollInterval() {
         return pollInterval;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
new file mode 100644
index 0000000..b24b7e5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/Replicator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
+
+    private Config config;
+
+    private EventProcessor eventProcessor;
+
+    private Object lock = new Object();
+    private BinlogPosition nextBinlogPosition;
+    private long nextQueueOffset;
+    private long xid;
+    private BlockingQueue<Transaction> queue = new LinkedBlockingQueue<>();
+
+    public Replicator(Config config){
+        this.config = config;
+    }
+
+    public void start() {
+
+        try {
+
+            eventProcessor = new EventProcessor(this);
+            eventProcessor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+        }
+    }
+
+    public void stop(){
+        eventProcessor.stop();
+    }
+
+    public void commit(Transaction transaction, boolean isComplete) {
+
+        queue.add(transaction);
+        for (int i = 0; i < 3; i++) {
+            try {
+                if (isComplete) {
+                    long offset = 1;
+                    synchronized (lock) {
+                        xid = transaction.getXid();
+                        nextBinlogPosition = transaction.getNextBinlogPosition();
+                        nextQueueOffset = offset;
+                    }
+
+                } else {
+                }
+                break;
+
+            } catch (Exception e) {
+                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+            }
+        }
+    }
+
+    public void logPosition() {
+
+        String binlogFilename = null;
+        long xid = 0L;
+        long nextPosition = 0L;
+        long nextOffset = 0L;
+
+        synchronized (lock) {
+            if (nextBinlogPosition != null) {
+                xid = this.xid;
+                binlogFilename = nextBinlogPosition.getBinlogFilename();
+                nextPosition = nextBinlogPosition.getPosition();
+                nextOffset = nextQueueOffset;
+            }
+        }
+
+        if (binlogFilename != null) {
+            POSITION_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
+                xid, binlogFilename, nextPosition, nextOffset);
+        }
+
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+//    public BinlogPosition getNextBinlogPosition() {
+//        return nextBinlogPosition;
+//    }
+
+    public BlockingQueue<Transaction> getQueue() {
+        return queue;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
index adb1c62..32ea763 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
@@ -18,11 +18,106 @@
  */
 
 package org.apache.rocketmq.connect.jdbc.connector;
-
 import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.apache.rocketmq.connect.jdbc.Replicator;
+import org.apache.rocketmq.connect.jdbc.schema.Table;
+import org.apache.rocketmq.connect.jdbc.source.Querier;
+import org.apache.rocketmq.connect.jdbc.schema.column.*;
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.Field;
+
+public class JdbcSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(JdbcSourceTask.class);
+
+    private Replicator replicator;
+
+    private Config config;
+    
+	private List<Table> list=new LinkedList<>();
+	
+	Querier querier = new Querier();
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        try {
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("nextQuery", "database");
+            jsonObject.put("nextPosition", "10");
+        	//To be Continued
+            querier.poll();
+    		System.out.println(querier.getList().size());
+            for(Table dataRow : querier.getList()){
+            	System.out.println(dataRow.getColList().get(0));
+                Schema schema = new Schema();
+                schema.setDataSource(dataRow.getDatabase());
+                schema.setName(dataRow.getName());
+                schema.setFields(new ArrayList<>());
+                for(int i = 0; i < dataRow.getColList().size(); i++){
+                    String columnName = dataRow.getColList().get(i);
+                    String rawDataType = dataRow.getRawDataTypeList().get(i);
+                    Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
+                    schema.getFields().add(field);
+                }
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.timestamp(System.currentTimeMillis())
+                    .queue(dataRow.getName())
+                    .entryType(EntryType.CREATE);
+                for(int i = 0; i < dataRow.getColList().size(); i++){
+                	Object value=dataRow.getDataList().get(i);
+                    System.out.println(1);
+                	System.out.println(dataRow.getColList().get(i)+"|"+value);
+                    dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSON.toJSONString(value));
+                }
+                SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                    ByteBuffer.wrap(config.jdbcUrl.getBytes("UTF-8")),
+                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8")));
+                res.add(sourceDataEntry);
+            }
+        } catch (Exception e) {
+            log.error("JDBC task poll error, current config:" + JSON.toJSONString(config), e);
+        }
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue props) {
+        try {
+            this.config = new Config();
+            this.config.load(props);
+    		querier.start();
+        } catch (Exception e) {
+            log.error("JDBC task start failed.", e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        replicator.stop();
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
new file mode 100644
index 0000000..15fb77b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Database.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+//import io.openmessaging.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Database {
+    private static final String SQL = "select table_name,column_name,data_type,column_type,character_set_name " +
+        "from information_schema.columns " +
+        "where table_schema = ? order by ORDINAL_POSITION";
+    private String name;
+    private DataSource dataSource;
+    public Map<String, Table> tableMap = new HashMap<String, Table>();
+    public Database(String name, DataSource dataSource) {
+        this.name = name;
+        this.dataSource = dataSource;
+    }
+
+    public void init() throws SQLException {
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(SQL);
+            ps.setString(1, name);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String tableName = rs.getString(1);
+                String colName = rs.getString(2);
+                String dataType = rs.getString(3);
+                String colType = rs.getString(4);
+                String charset = rs.getString(5);
+                
+                ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
+
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(colName);
+                table.addParser(columnParser);
+                table.addRawDataType(dataType);
+            }
+
+        } finally {
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+   //     LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
new file mode 100644
index 0000000..6ce6621
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Schema.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+//import io.openmessaging.mysql.binlog.EventProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Schema {
+//    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private static final String SQL = "select schema_name from information_schema.schemata";
+    //取得数据库
+    private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+        Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"})
+    );
+    //忽略的数据库
+    private DataSource dataSource;
+
+    public Map<String, Database> dbMap;
+
+    public Schema(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public void load() throws SQLException {
+
+        dbMap = new HashMap<>();
+
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(SQL);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String dbName = rs.getString(1);
+                if (!IGNORED_DATABASES.contains(dbName)) {
+                    Database database = new Database(dbName, dataSource);
+                    dbMap.put(dbName, database);
+                    //dbMap存着各个数据库的名字
+                }
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+        for (Database db : dbMap.values()) {
+            db.init();
+        }
+
+    }
+
+    public Table getTable(String dbName, String tableName) {
+
+        if (dbMap == null) {
+            reload();
+        }
+
+        Database database = dbMap.get(dbName);
+        if (database == null) {
+            return null;
+        }
+
+        Table table = database.getTable(tableName);
+        if (table == null) {
+            return null;
+        }
+
+        return table;
+    }
+
+    private void reload() {
+
+        while (true) {
+            try {
+                load();
+                break;
+            } catch (Exception e) {
+     //           LOGGER.error("Reload schema error.", e);
+            System.out.println("Reload schema error."+e);
+            }
+        }
+    }
+
+    public void reset() {
+        dbMap = null;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
new file mode 100644
index 0000000..c0d793d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/Table.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema;
+
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+import java.util.LinkedList;
+import java.util.List;
+
+public class Table {
+
+    private String database;
+    private String name;
+    private List<String> colList = new LinkedList<>();
+    private List<ColumnParser> parserList = new LinkedList<>();
+    private List<String> rawDataTypeList = new LinkedList<>();
+    private List<Object> dataList =new LinkedList<>();
+
+    public Table(String database, String table) {
+        this.database = database;
+        this.name = table;
+    }
+
+    public void addCol(String column) {
+        colList.add(column);
+    }
+
+    public void setParserList(List<ColumnParser> parserList) {
+		this.parserList = parserList;
+	}
+
+	public void setRawDataTypeList(List<String> rawDataTypeList) {
+		this.rawDataTypeList = rawDataTypeList;
+	}
+
+	public void addParser(ColumnParser columnParser) {
+        parserList.add(columnParser);
+    }
+
+    public void addRawDataType(String rawDataType){
+        this.rawDataTypeList.add(rawDataType);
+    }
+
+    public List<String> getColList() {
+        return colList;
+    }
+
+    public List<String> getRawDataTypeList() {
+        return rawDataTypeList;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<ColumnParser> getParserList() {
+        return parserList;
+    }
+
+	public List<Object> getDataList() {
+		return dataList;
+	}
+
+	public void setDataList(List<Object> dataList) {
+		this.dataList = dataList;
+	}
+
+	public void setColList(List<String> colList) {
+		this.colList = colList;
+	}
+    
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
similarity index 52%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
index adb1c62..610f07d 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/BigIntColumnParser.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
 
-import io.openmessaging.connector.api.source.SourceTask;
+import java.math.BigInteger;
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class BigIntColumnParser extends ColumnParser {
+
+    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+    private boolean signed;
+
+    public BigIntColumnParser(String colType) {
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return value;
+        }
+
+        Long l = (Long) value;
+        if (!signed && l < 0) {
+            return max.add(BigInteger.valueOf(l));
+        } else {
+            return l;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java
new file mode 100644
index 0000000..341064e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/ColumnParser.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import io.openmessaging.connector.api.data.FieldType;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public abstract class ColumnParser {
+	
+    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return new IntColumnParser(dataType, colType);
+            case "bigint":
+                return new BigIntColumnParser(colType);
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return new StringColumnParser(charset);
+            case "date":
+            case "datetime":
+            case "timestamp":
+                return new DateTimeColumnParser();
+            case "time":
+                return new TimeColumnParser();
+            case "year":
+                return new YearColumnParser();
+            case "enum":
+                return new EnumColumnParser(colType);
+            case "set":
+                return new SetColumnParser(colType);
+            default:
+                return new DefaultColumnParser();
+        }
+    }
+
+    public static FieldType mapConnectorFieldType(String dataType) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return FieldType.INT32;
+            case "bigint":
+                return FieldType.BIG_INTEGER;
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return FieldType.STRING;
+            case "date":
+            case "datetime":
+            case "timestamp":
+            case "time":
+            case "year":
+                return FieldType.DATETIME;
+            case "enum":
+                return null;
+            case "set":
+                return null;
+            default:
+                return FieldType.BYTES;
+        }
+    }
+
+    public static String[] extractEnumValues(String colType) {
+        String[] enumValues = {};
+        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+        if (matcher.matches()) {
+            enumValues = matcher.group(2).replace("'", "").split(",");
+        }
+
+        return enumValues;
+    }
+
+    public abstract Object getValue(Object value);
+
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..8736280
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+    private static SimpleDateFormat dateTimeFormat;
+    private static SimpleDateFormat dateTimeUtcFormat;
+
+    static {
+        dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeUtcFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return dateTimeFormat.format(value);
+        }
+
+        if (value instanceof Long) {
+            return dateTimeUtcFormat.format(new Date((Long) value));
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
similarity index 65%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
index adb1c62..ee3075a 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/DefaultColumnParser.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
 
-import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.commons.codec.binary.Base64;
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class DefaultColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof byte[]) {
+            return Base64.encodeBase64String((byte[]) value);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
similarity index 57%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
index adb1c62..0fd14ba 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/EnumColumnParser.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
 
-import io.openmessaging.connector.api.source.SourceTask;
+public class EnumColumnParser extends ColumnParser {
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+    private String[] enumValues;
+
+    public EnumColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        Integer i = (Integer) value;
+        if (i == 0) {
+            return null;
+        } else {
+            return enumValues[i - 1];
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..36c6078
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/IntColumnParser.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+    private int bits;
+    private boolean signed;
+
+    public IntColumnParser(String dataType, String colType) {
+
+        switch (dataType) {
+            case "tinyint":
+                bits = 8;
+                break;
+            case "smallint":
+                bits = 16;
+                break;
+            case "mediumint":
+                bits = 24;
+                break;
+            case "int":
+                bits = 32;
+        }
+
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Long) {
+            return value;
+        }
+
+        if (value instanceof Integer) {
+            Integer i = (Integer) value;
+            if (signed || i > 0) {
+                return i;
+            } else {
+                return (1L << bits) + i;
+            }
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..d1e6bbc
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/SetColumnParser.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public SetColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        StringBuilder builder = new StringBuilder();
+        long l = (Long) value;
+
+        boolean needSplit = false;
+        for (int i = 0; i < enumValues.length; i++) {
+            if (((l >> i) & 1) == 1) {
+                if (needSplit)
+                    builder.append(",");
+
+                builder.append(enumValues[i]);
+                needSplit = true;
+            }
+        }
+
+        return builder.toString();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..cd4f04f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.schema.column;
+
+import org.apache.commons.codec.Charsets;
+
+public class StringColumnParser extends ColumnParser {
+
+    private String charset;
+
+    public StringColumnParser(String charset) {
+        this.charset = charset.toLowerCase();
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        byte[] bytes = (byte[]) value;
+
+        switch (charset) {
+            case "utf8":
+            case "utf8mb4":
+                return new String(bytes, Charsets.UTF_8);
+            case "latin1":
+            case "ascii":
+                return new String(bytes, Charsets.ISO_8859_1);
+            case "ucs2":
+                return new String(bytes, Charsets.UTF_16);
+            default:
+                return new String(bytes, Charsets.toCharset(charset));
+
+        }
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
similarity index 65%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
index adb1c62..9926d81 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/TimeColumnParser.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
 
-import io.openmessaging.connector.api.source.SourceTask;
+import java.sql.Time;
+import java.sql.Timestamp;
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class TimeColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+
+            return new Time(((Timestamp) value).getTime());
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
similarity index 61%
copy from src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
copy to src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
index adb1c62..14cc798 100644
--- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/schema/column/YearColumnParser.java
@@ -1,5 +1,3 @@
-
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.connect.jdbc.connector;
+package org.apache.rocketmq.connect.jdbc.schema.column;
 
-import io.openmessaging.connector.api.source.SourceTask;
+import java.sql.Date;
+import java.util.Calendar;
 
-public abstract class JdbcSourceTask extends SourceTask {
-/*
- * To Be Continued
- */
-}
\ No newline at end of file
+public class YearColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime((Date) value);
+            return calendar.get(Calendar.YEAR);
+        }
+
+        return value;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
new file mode 100644
index 0000000..61323d4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/jdbc/source/Querier.java
@@ -0,0 +1,187 @@
+package org.apache.rocketmq.connect.jdbc.source;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.sql.DataSource;
+
+import org.apache.rocketmq.connect.jdbc.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+
+
+import org.apache.rocketmq.connect.jdbc.schema.*;
+import org.apache.rocketmq.connect.jdbc.schema.column.ColumnParser;
+
+public class Querier {
+	static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
+	private final Logger log = LoggerFactory.getLogger(getClass()); // use concrete subclass
+	protected String topicPrefix;
+	protected String jdbcUrl;
+	private final Queue<Connection> connections = new ConcurrentLinkedQueue<>();
+	private Config config = new Config();
+	private DataSource dataSource;
+	private List<Table> list=new LinkedList<>();
+
+	
+	
+	public List<Table> getList() {
+		return list;
+	}
+
+	public void setList(List<Table> list) {
+		this.list = list;
+	}
+
+	public Connection getConnection() throws SQLException {
+
+		// These config names are the same for both source and sink configs ...
+		String username = config.jdbcUsername;
+		String dbPassword = config.jdbcPassword;
+		jdbcUrl = config.jdbcUrl;
+		Properties properties = new Properties();
+		if (username != null) {
+			properties.setProperty("user", username);
+		}
+		if (dbPassword != null) {
+			properties.setProperty("password", dbPassword);
+		}
+		Connection connection = DriverManager.getConnection(jdbcUrl, properties);
+
+		connections.add(connection);
+		return connection;
+	}
+
+	public void close() {
+		Connection conn;
+		while ((conn = connections.poll()) != null) {
+			try {
+				conn.close();
+			} catch (Throwable e) {
+				log.warn("Error while closing connection to {}", "jdbc", e);
+			}
+		}
+	}
+
+	protected PreparedStatement createDBPreparedStatement(Connection db) throws SQLException {
+
+		String SQL = "select table_name,column_name,data_type,column_type,character_set_name "
+				+ "from information_schema.columns " + "where table_schema = jdbc_db order by ORDINAL_POSITION";
+
+		log.trace("Creating a PreparedStatement '{}'", SQL);
+		PreparedStatement stmt = db.prepareStatement(SQL);
+		return stmt;
+
+	}
+
+	protected PreparedStatement createPreparedStatement(Connection db, String string) throws SQLException {
+		String query = "select * from " + string;
+		log.trace("Creating a PreparedStatement '{}'", query);
+		PreparedStatement stmt = db.prepareStatement(query);
+		return stmt;
+
+	}
+
+	protected ResultSet executeQuery(PreparedStatement stmt) throws SQLException {
+		return stmt.executeQuery();
+	}
+
+	public static void main(String[] args) throws Exception {
+		Querier querier = new Querier();
+		try {
+			querier.start();
+			querier.poll();
+
+		} catch (SQLException e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+
+	}
+
+	private Schema schema;
+
+	private Map<Long, Table> tableMap = new HashMap<>();
+
+	public void poll() {
+		try {
+
+			PreparedStatement stmt;
+			String query = "select * from ";
+			Connection conn = dataSource.getConnection();
+
+			for (Map.Entry<String, Database> entry : schema.dbMap.entrySet()) {
+				String db = entry.getKey();
+				if(!db.contains("jdbc_db"))
+					continue;
+				Iterator<Map.Entry<String, Table>> iterator = entry.getValue().tableMap.entrySet().iterator();
+				while (iterator.hasNext()) {
+					Map.Entry<String, Table> tableEntry = iterator.next();
+					String tb=tableEntry.getKey();
+					stmt = conn.prepareStatement(query+db + "." +tb);
+					ResultSet rs;
+					rs = stmt.executeQuery();
+				    List<String> colList = tableEntry.getValue().getColList();
+				    List<String> DataTypeList = tableEntry.getValue().getRawDataTypeList();
+				    List<ColumnParser> ParserList = tableEntry.getValue().getParserList();
+
+				    while(rs.next()) {
+					    Table table=new Table(db, tb);
+					    System.out.print("|");
+			    		table.setColList(colList);
+			    		table.setRawDataTypeList(DataTypeList);
+			    		table.setParserList(ParserList);
+			    		
+				    	for (String string : colList) {
+				    		table.getDataList().add(rs.getObject(string));
+				    		System.out.print(string+" : "+rs.getObject(string)+"|");
+					}
+				    	list.add(table);
+				    	System.out.println();
+				    }
+				}
+			}
+
+		} catch (SQLException e) {
+			e.printStackTrace();
+		}
+
+	}
+
+	public void start() throws Exception {
+		initDataSource();
+		schema = new Schema(dataSource);
+		schema.load();
+	}
+
+	private void initDataSource() throws Exception {
+		Map<String, String> map = new HashMap<>();
+		map.put("driverClassName", "com.mysql.cj.jdbc.Driver");
+		map.put("url",
+				"jdbc:mysql://" + config.jdbcUrl + "?useSSL=true&verifyServerCertificate=false&serverTimezone=GMT%2B8");
+		map.put("username", config.jdbcUsername);
+		map.put("password", config.jdbcPassword);
+		map.put("initialSize", "2");
+		map.put("maxActive", "2");
+		map.put("maxWait", "60000");
+		map.put("timeBetweenEvictionRunsMillis", "60000");
+		map.put("minEvictableIdleTimeMillis", "300000");
+		map.put("validationQuery", "SELECT 1 FROM DUAL");
+		map.put("testWhileIdle", "true");
+		dataSource = DruidDataSourceFactory.createDataSource(map);
+	}
+
+}